This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e0fb3015da0 [cleanup] Refactored Backoff to be more consistent and
intuitive to use (#25278)
e0fb3015da0 is described below
commit e0fb3015da0eed80fa62b7466e6052f83bf8e5ae
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 9 03:18:10 2026 -0700
[cleanup] Refactored Backoff to be more consistent and intuitive to use
(#25278)
---
.../pulsar/broker/resources/BaseResources.java | 14 +-
.../pulsar/broker/service/AbstractReplicator.java | 11 +-
.../service/PulsarMetadataEventSynchronizer.java | 9 +-
.../pulsar/broker/service/TopicListService.java | 14 +-
.../PersistentDispatcherMultipleConsumers.java | 21 +-
...rsistentDispatcherMultipleConsumersClassic.java | 11 +-
.../PersistentDispatcherSingleActiveConsumer.java | 12 +-
.../service/persistent/PersistentReplicator.java | 9 +-
.../pendingack/impl/PendingAckHandleImpl.java | 8 +-
.../common/naming/NamespaceBundleFactory.java | 6 +-
.../pulsar/client/impl/ConnectionHandlerTest.java | 25 ++-
.../apache/pulsar/client/impl/RetryUtilTest.java | 23 +--
.../client/impl/BinaryProtoLookupService.java | 12 +-
.../pulsar/client/impl/ConnectionHandler.java | 4 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 44 ++--
.../apache/pulsar/client/impl/ProducerImpl.java | 13 +-
.../pulsar/client/impl/PulsarClientImpl.java | 13 +-
.../client/impl/PulsarServiceNameResolver.java | 14 +-
.../pulsar/client/impl/TopicListWatcher.java | 15 +-
.../client/impl/TransactionMetaStoreHandler.java | 43 ++--
.../org/apache/pulsar/client/util/RetryUtil.java | 6 +-
.../pulsar/client/impl/ConsumerImplTest.java | 4 +-
.../org/apache/pulsar/common/util/Backoff.java | 226 +++++++++++++++------
.../apache/pulsar/common/util/BackoffBuilder.java | 65 ------
.../org/apache/pulsar/common/util/BackoffTest.java | 143 ++++++-------
.../pulsar/metadata/api/MetadataCacheConfig.java | 24 +--
.../metadata/cache/impl/MetadataCacheImpl.java | 14 +-
.../coordination/impl/ResourceLockImpl.java | 8 +-
.../apache/pulsar/metadata/MetadataCacheTest.java | 57 +++---
29 files changed, 449 insertions(+), 419 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index f31e5a6b78a..636a8db4856 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -60,8 +62,10 @@ public class BaseResources<T> {
public BaseResources(MetadataStore store, Class<T> clazz, int
operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(clazz,
MetadataCacheConfig.builder()
-
.retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec,
- TimeUnit.SECONDS))
+ .retryBackoff(Backoff.builder()
+ .initialDelay(Duration.ofMillis(5))
+ .maxBackoff(Duration.ofSeconds(3))
+
.mandatoryStop(Duration.ofSeconds(operationTimeoutSec)))
.build());
this.operationTimeoutSec = operationTimeoutSec;
}
@@ -69,8 +73,10 @@ public class BaseResources<T> {
public BaseResources(MetadataStore store, TypeReference<T> typeRef, int
operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(typeRef,
MetadataCacheConfig.builder()
-
.retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec,
- TimeUnit.SECONDS))
+ .retryBackoff(Backoff.builder()
+ .initialDelay(Duration.ofMillis(5))
+ .maxBackoff(Duration.ofSeconds(3))
+
.mandatoryStop(Duration.ofSeconds(operationTimeoutSec)))
.build());
this.operationTimeoutSec = operationTimeoutSec;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index c7a36ad1b21..3477ab793f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -71,8 +71,7 @@ public abstract class AbstractReplicator implements
Replicator {
protected final int producerQueueSize;
protected final ProducerBuilder<byte[]> producerBuilder;
- protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0,
- TimeUnit.MILLISECONDS);
+ protected final Backoff backOff = Backoff.create();
protected final String replicatorPrefix;
@@ -209,7 +208,7 @@ public abstract class AbstractReplicator implements
Replicator {
}).exceptionally(ex -> {
Pair<Boolean, State> setDisconnectedRes =
compareSetAndGetState(State.Starting, State.Disconnected);
if (setDisconnectedRes.getLeft()) {
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
log.warn("[{}] Failed to create remote producer ({}), retrying
in {} s",
replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
@@ -238,7 +237,7 @@ public abstract class AbstractReplicator implements
Replicator {
* If we start a producer immediately, we will get a conflict
producer(same name producer) registered error.
*/
protected void delayStartProducerAfterDisconnected() {
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
if (log.isDebugEnabled()) {
log.debug(
"[{}] waiting for producer to close before attempting to
reconnect, retrying in {} s",
@@ -364,7 +363,7 @@ public abstract class AbstractReplicator implements
Replicator {
* Nit: The better solution is creating a {@link
CompletableFuture} to trace the in-progress
* creation and call
"inProgressCreationFuture.thenApply(closeProducer())".
*/
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
brokerService.executor().schedule(() ->
closeProducerAsync(true),
waitTimeMs, TimeUnit.MILLISECONDS);
} else {
@@ -415,7 +414,7 @@ public abstract class AbstractReplicator implements
Replicator {
return future.thenRun(() -> {
actionAfterClosed.run();
}).exceptionally(ex -> {
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
log.warn(
"[{}] Exception: '{}' occurred while trying to close the
producer. Replicator state: {}."
+ " Retrying again in {} s.",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index a5fac333ae5..0eee0c25ff3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -65,8 +65,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
private volatile State state;
public static final String SUBSCRIPTION_NAME = "metadata-syncer";
private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
- protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0,
- TimeUnit.MILLISECONDS);
+ protected final Backoff backOff = Backoff.create();
private volatile CompletableFuture<Void> closeFuture;
public enum State {
@@ -166,7 +165,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
});
}
}).exceptionally(ex -> {
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
log.warn("[{}] Failed to create producer ({}), retrying in {}
s", topicName, ex.getMessage(),
waitTimeMs / 1000.0);
// BackOff before retrying
@@ -238,7 +237,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
});
}
}).exceptionally(ex -> {
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
log.warn("[{}] Failed to create consumer ({}), retrying in {} s",
topicName, ex.getMessage(),
waitTimeMs / 1000.0);
// BackOff before retrying
@@ -318,7 +317,7 @@ public class PulsarMetadataEventSynchronizer implements
MetadataEventSynchronize
return;
}
// Retry.
- long waitTimeMs = backOff.next();
+ long waitTimeMs = backOff.next().toMillis();
log.warn("[{}] Exception: '{}' occurred while trying to close the
{}. Retrying again in {} s.",
topicName, ex.getMessage(),
asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex);
brokerService.executor().schedule(() ->
closeResource(asyncCloseable, future), waitTimeMs,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index 262c734ea95..eadb1f3969e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -320,10 +321,9 @@ public class TopicListService {
.concurrencyLevel(1)
.build();
this.topicResources = pulsar.getPulsarResources().getTopicResources();
- this.retryBackoff = new Backoff(
- 100, TimeUnit.MILLISECONDS,
- 25, TimeUnit.SECONDS,
- 0, TimeUnit.MILLISECONDS);
+ this.retryBackoff = Backoff.builder()
+ .maxBackoff(Duration.ofSeconds(25))
+ .build();
}
public void inactivate() {
@@ -471,7 +471,7 @@ public class TopicListService {
if (connection.isActive() && (unwrappedException instanceof
AsyncSemaphore.PermitAcquireTimeoutException
|| unwrappedException instanceof
AsyncSemaphore.PermitAcquireQueueFullException)) {
// retry with backoff if permit acquisition fails due to
timeout or queue full
- long retryAfterMillis = this.retryBackoff.next();
+ long retryAfterMillis =
this.retryBackoff.next().toMillis();
log.info("[{}] {} when initializing topic list watcher
watcherId={} for namespace {}. "
+ "Retrying in {} " + "ms.", connection,
unwrappedException.getMessage(), watcherId,
namespace, retryAfterMillis);
@@ -568,7 +568,7 @@ public class TopicListService {
&& (unwrappedException instanceof
AsyncSemaphore.PermitAcquireTimeoutException
|| unwrappedException instanceof
AsyncSemaphore.PermitAcquireQueueFullException)) {
// retry with backoff if permit acquisition fails due to
timeout or queue full
- long retryAfterMillis = this.retryBackoff.next();
+ long retryAfterMillis =
this.retryBackoff.next().toMillis();
log.info("[{}] {} when updating topic list watcher
watcherId={} for namespace {}. Retrying in {} "
+ "ms.", connection,
unwrappedException.getMessage(), watcherId, namespace,
retryAfterMillis);
@@ -687,7 +687,7 @@ public class TopicListService {
// stop retrying and complete successfully
return CompletableFuture.completedFuture(null);
}
- long retryDelay = retryBackoff.next();
+ long retryDelay = retryBackoff.next().toMillis();
retryCount.incrementAndGet();
log.info("[{}] Cannot acquire direct memory tokens for sending {}.
Retry {} in {} ms. {}", connection,
operationName, retryCount.get(), retryDelay,
t.getMessage());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4f33e3e379b..a88fbf863eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -23,6 +23,7 @@ import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -165,14 +166,14 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractPersistentDis
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer,
this::addEntryToReplay, subscription);
ServiceConfiguration serviceConfiguration =
topic.getBrokerService().pulsar().getConfiguration();
- this.readFailureBackoff = new Backoff(
-
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
- TimeUnit.MILLISECONDS,
- 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
- retryBackoff = new Backoff(
-
serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
- serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(),
TimeUnit.MILLISECONDS,
- 0, TimeUnit.MILLISECONDS);
+ this.readFailureBackoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(serviceConfiguration
+ .getDispatcherReadFailureBackoffInitialTimeInMs()))
+ .build();
+ retryBackoff = Backoff.builder()
+
.initialDelay(Duration.ofMillis(serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs()))
+
.maxBackoff(Duration.ofMillis(serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs()))
+ .build();
}
@Override
@@ -515,7 +516,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractPersistentDis
}
protected synchronized void reScheduleReadWithBackoff() {
- reScheduleReadInMs(retryBackoff.next());
+ reScheduleReadInMs(retryBackoff.next().toMillis());
}
protected void reScheduleReadInMs(long readAfterMs) {
@@ -1000,7 +1001,7 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractPersistentDis
public synchronized void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
ReadType readType = (ReadType) ctx;
- long waitTimeMillis = readFailureBackoff.next();
+ long waitTimeMillis = readFailureBackoff.next().toMillis();
// Do not keep reading more entries if the cursor is already closed.
if (exception instanceof
ManagedLedgerException.CursorAlreadyClosedException) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
index 0746b7215b1..d828f338132 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -158,10 +159,10 @@ public class PersistentDispatcherMultipleConsumersClassic
extends AbstractPersis
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer,
this::addMessageToReplay, subscription);
- this.readFailureBackoff = new Backoff(
-
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
- TimeUnit.MILLISECONDS,
- 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+ this.readFailureBackoff = Backoff.builder()
+
.initialDelay(Duration.ofMillis(topic.getBrokerService().pulsar().getConfiguration()
+ .getDispatcherReadFailureBackoffInitialTimeInMs()))
+ .build();
}
@Override
@@ -858,7 +859,7 @@ public class PersistentDispatcherMultipleConsumersClassic
extends AbstractPersis
public synchronized void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
ReadType readType = (ReadType) ctx;
- long waitTimeMillis = readFailureBackoff.next();
+ long waitTimeMillis = readFailureBackoff.next().toMillis();
// Do not keep reading more entries if the cursor is already closed.
if (exception instanceof
ManagedLedgerException.CursorAlreadyClosedException) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 9e6ba93b9dc..2420b2107ea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -22,6 +22,7 @@ import static
org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntriesW
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -87,10 +88,11 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
this.name = topic.getName() + " / " + (cursor.getName() != null ?
Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
- this.readFailureBackoff = new
Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
- TimeUnit.MILLISECONDS,
serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
- TimeUnit.MILLISECONDS,
serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
- TimeUnit.MILLISECONDS);
+ this.readFailureBackoff = Backoff.builder()
+
.initialDelay(Duration.ofMillis(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs()))
+
.maxBackoff(Duration.ofMillis(serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs()))
+
.mandatoryStop(Duration.ofMillis(serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs()))
+ .build();
this.redeliveryTracker =
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded();
}
@@ -475,7 +477,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
return;
}
- long waitTimeMillis = readFailureBackoff.next();
+ long waitTimeMillis = readFailureBackoff.next().toMillis();
if (exception instanceof NoMoreEntriesToReadException) {
if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index c1d73cd3891..d56f2147514 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -97,8 +98,10 @@ public abstract class PersistentReplicator extends
AbstractReplicator
protected int messageTTLInSeconds = 0;
- private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS,
- 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+ private final Backoff readFailureBackoff = Backoff.builder()
+ .initialDelay(Duration.ofSeconds(1))
+ .maxBackoff(Duration.ofMinutes(1))
+ .build();
private final PersistentMessageExpiryMonitor expiryMonitor;
// for connected subscriptions, message expiry will be checked if the
backlog is greater than this threshold
@@ -508,7 +511,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
// Reduce read batch size to avoid flooding bookies with retries
readBatchSize =
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
- long waitTimeMillis = readFailureBackoff.next();
+ long waitTimeMillis = readFailureBackoff.next().toMillis();
if (exception instanceof CursorAlreadyClosedException) {
log.warn("[{}] Error reading entries because replicator is"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 591842927f3..bc4c74ab3b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -25,6 +25,7 @@ import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWit
import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -145,8 +146,9 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
private final long pendingAckInitFailureBackoffInitialTimeInMs = 100;
- public final Backoff backoff = new
Backoff(pendingAckInitFailureBackoffInitialTimeInMs, TimeUnit.MILLISECONDS,
- 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+ public final Backoff backoff = Backoff.builder()
+
.initialDelay(Duration.ofMillis(pendingAckInitFailureBackoffInitialTimeInMs))
+ .build();
private final Timer transactionOpTimer;
@@ -958,7 +960,7 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
public void exceptionHandleFuture(Throwable t) {
if (isRetryableException(t)) {
this.state = State.None;
- long retryTime = backoff.next();
+ long retryTime = backoff.next().toMillis();
log.warn("[{}][{}] Failed to init transaction pending ack. It will
be retried in {} Ms",
persistentSubscription.getTopic().getName(), subName,
retryTime, t);
transactionOpTimer.newTimeout((timeout) -> init(), retryTime,
TimeUnit.MILLISECONDS);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 69f5208ce67..bace2bf87a9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -138,14 +138,16 @@ public class NamespaceBundleFactory {
future.completeExceptionally(e);
} else {
LOG.warn("Error loading bundle for {}. Retrying exception",
namespace, e);
- long retryDelay = backoff.next();
+ long retryDelay = backoff.next().toMillis();
pulsar.getExecutor().schedule(() ->
doLoadBundles(namespace, future, backoff, retryDeadline),
retryDelay, TimeUnit.MILLISECONDS);
}
}
private static Backoff createBackoff() {
- return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0,
TimeUnit.MILLISECONDS);
+ return Backoff.builder()
+ .maxBackoff(Duration.ofSeconds(5))
+ .build();
}
private NamespaceBundles readBundles(NamespaceName namespace,
LocalPolicies localPolicies, long version)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
index 96079ae6daa..77bfe6f8ded 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -25,14 +25,12 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
@@ -45,9 +43,9 @@ import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class ConnectionHandlerTest extends ProducerConsumerBase {
- private static final Backoff BACKOFF = new
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
- .setMandatoryStop(1, TimeUnit.SECONDS)
- .setMax(3, TimeUnit.SECONDS).create();
+ private static final Backoff BACKOFF =
Backoff.builder().initialDelay(Duration.ofMillis(1))
+ .mandatoryStop(Duration.ofSeconds(1))
+ .maxBackoff(Duration.ofSeconds(3)).build();
private ExecutorService executor;
@BeforeClass(alwaysRun = true)
@@ -115,7 +113,7 @@ public class ConnectionHandlerTest extends
ProducerConsumerBase {
// 2. connectionFailed is called
final ConnectionHandler handler2 = new ConnectionHandler(
- new MockedHandlerState((PulsarClientImpl) pulsarClient, null),
new MockedBackoff(),
+ new MockedHandlerState((PulsarClientImpl) pulsarClient, null),
createMockedBackoff(),
cnx -> CompletableFuture.completedFuture(null));
FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
handler2.grabCnx();
@@ -125,7 +123,7 @@ public class ConnectionHandlerTest extends
ProducerConsumerBase {
// 3. connectionOpened completes exceptionally
final ConnectionHandler handler3 = new ConnectionHandler(
- new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), new MockedBackoff(),
+ new MockedHandlerState((PulsarClientImpl) pulsarClient,
"my-topic"), createMockedBackoff(),
cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
handler3.grabCnx();
@@ -146,11 +144,12 @@ public class ConnectionHandlerTest extends
ProducerConsumerBase {
}
}
- private static class MockedBackoff extends Backoff {
-
- // Set a large backoff so that reconnection won't happen in tests
- public MockedBackoff() {
- super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
- }
+ // Set a large backoff so that reconnection won't happen in tests
+ private static Backoff createMockedBackoff() {
+ return Backoff.builder()
+ .initialDelay(Duration.ofHours(1))
+ .maxBackoff(Duration.ofHours(2))
+ .mandatoryStop(Duration.ofHours(1))
+ .build();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index 9efd14df7d6..555c0ca0c70 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -20,15 +20,14 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;
@@ -42,11 +41,11 @@ public class RetryUtilTest {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Boolean> callback = new CompletableFuture<>();
AtomicInteger atomicInteger = new AtomicInteger(0);
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(2000, TimeUnit.MILLISECONDS)
- .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
- .create();
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofMillis(2000))
+ .mandatoryStop(Duration.ofMillis(5000))
+ .build();
RetryUtil.retryAsynchronously(() -> {
CompletableFuture<Boolean> future = new CompletableFuture<>();
atomicInteger.incrementAndGet();
@@ -66,11 +65,11 @@ public class RetryUtilTest {
@Cleanup("shutdownNow")
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Boolean> callback = new CompletableFuture<>();
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(500, TimeUnit.MILLISECONDS)
- .setMax(2000, TimeUnit.MILLISECONDS)
- .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
- .create();
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(500))
+ .maxBackoff(Duration.ofMillis(2000))
+ .mandatoryStop(Duration.ofMillis(5000))
+ .build();
long start = System.currentTimeMillis();
RetryUtil.retryAsynchronously(() ->
FutureUtil.failedFuture(new RuntimeException("fail")),
backoff, executor, callback);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 1f66e65179a..2f121a80e15 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -47,7 +48,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
@@ -358,11 +358,9 @@ public class BinaryProtoLookupService implements
LookupService {
@Nullable Map<String, String> properties) {
CompletableFuture<GetTopicsResult> topicsFuture = new
CompletableFuture<>();
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
- .setMax(1, TimeUnit.MINUTES)
- .create();
+ Backoff backoff = Backoff.builder()
+ .mandatoryStop(Duration.ofMillis(opTimeoutMs.get() * 2))
+ .build();
getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture,
mode,
topicsPattern, topicsHash, properties);
return topicsFuture;
@@ -404,7 +402,7 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().releaseConnection(clientCnx);
});
}, lookupPinnedExecutor).exceptionally((e) -> {
- long nextDelay = Math.min(backoff.next(), remainingTime.get());
+ long nextDelay = Math.min(backoff.next().toMillis(),
remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
new PulsarClientException.TimeoutException(
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a4e262b35ae..01771fef9d6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -179,7 +179,7 @@ public class ConnectionHandler {
state.topic, state.getHandlerName(), state.getState());
return;
}
- long delayMs = backoff.next();
+ long delayMs = backoff.next().toMillis();
log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try
again in {} s",
state.topic, state.getHandlerName(),
exception.getMessage(), delayMs / 1000.0);
@@ -208,7 +208,7 @@ public class ConnectionHandler {
state.topic, state.getHandlerName(), state.getState());
return;
}
- long delayMs = initialConnectionDelayMs.orElse(backoff.next());
+ long delayMs = initialConnectionDelayMs.orElseGet(() ->
backoff.next().toMillis());
log.info("[{}] [{}] Closed connection {} -- Will try again in {}
s, hostUrl: {}",
state.topic, state.getHandlerName(), cnx.channel(),
delayMs / 1000.0, hostUrl.orElse(null));
state.client.timer().newTimeout(timeout -> {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 7091b05151e..868c45b277e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -39,6 +39,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -130,7 +131,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
@@ -371,12 +371,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
this.connectionHandler = new ConnectionHandler(this,
- new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create(),
+ Backoff.builder()
+
.initialDelay(Duration.ofNanos(client.getConfiguration()
+ .getInitialBackoffIntervalNanos()))
+
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()))
+ .build(),
this);
this.topicName = TopicName.get(topic);
@@ -2459,15 +2458,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return producerSupplier.get();
} else {
// calculate backoff time for given failure count
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
-
.setMandatoryStop(client.getConfiguration().getOperationTimeoutMs() * 2,
- TimeUnit.MILLISECONDS)
- .setMax(1, TimeUnit.MINUTES)
- .create();
+ Backoff backoff = Backoff.builder()
+
.mandatoryStop(Duration.ofMillis(client.getConfiguration().getOperationTimeoutMs()
* 2))
+ .build();
long backoffTimeMillis = 0;
for (int i = 0; i < failureCount; i++) {
- backoffTimeMillis = backoff.next();
+ backoffTimeMillis = backoff.next().toMillis();
}
CompletableFuture<Producer<byte[]>> newProducer = new
CompletableFuture<>();
ScheduledExecutorService executor =
@@ -2569,11 +2565,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf
seek, MessageId seekId,
Long seekTimestamp,
String seekBy) {
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create();
+ Backoff backoff = Backoff.builder()
+ .maxBackoff(Duration.ofMillis(opTimeoutMs.get() * 2))
+ .build();
if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED,
SeekStatus.IN_PROGRESS)) {
final String message = String.format(
@@ -2631,7 +2625,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
});
} else {
- long nextDelay = Math.min(backoff.next(), remainingTime.get());
+ long nextDelay = Math.min(backoff.next().toMillis(),
remainingTime.get());
if (nextDelay <= 0) {
failSeek(
new PulsarClientException.TimeoutException(
@@ -2838,11 +2832,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create();
+ Backoff backoff = Backoff.builder()
+ .maxBackoff(Duration.ofMillis(opTimeoutMs.get() * 2))
+ .build();
CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture =
new CompletableFuture<>();
@@ -2903,7 +2895,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
future.completeExceptionally(failReason);
return;
}
- long nextDelay = Math.min(backoff.next(), remainingTime.get());
+ long nextDelay = Math.min(backoff.next().toMillis(),
remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
new PulsarClientException.TimeoutException(
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 89fa8ebb998..3d601465aac 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -48,6 +48,7 @@ import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@@ -102,7 +103,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -326,11 +327,11 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
ConnectionHandler initConnectionHandler() {
return new ConnectionHandler(this,
- new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() -
100), TimeUnit.MILLISECONDS)
- .create(),
+ Backoff.builder()
+
.initialDelay(Duration.ofNanos(client.getConfiguration().getInitialBackoffIntervalNanos()))
+
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()))
+ .mandatoryStop(Duration.ofMillis(Math.max(100,
conf.getSendTimeoutMs() - 100)))
+ .build(),
this);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c681959126a..2082f8b1750 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -96,7 +96,6 @@ import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.topics.TopicsPattern;
import org.apache.pulsar.common.topics.TopicsPatternFactory;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.jspecify.annotations.Nullable;
@@ -1249,11 +1248,11 @@ public class PulsarClientImpl implements PulsarClient {
try {
TopicName topicName = TopicName.get(topic);
AtomicLong opTimeoutMs = new AtomicLong(conf.getLookupTimeoutMs());
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(conf.getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(opTimeoutMs.get() * 2,
TimeUnit.MILLISECONDS)
- .setMax(conf.getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .create();
+ Backoff backoff = Backoff.builder()
+
.initialDelay(Duration.ofNanos(conf.getInitialBackoffIntervalNanos()))
+ .mandatoryStop(Duration.ofMillis(opTimeoutMs.get() * 2))
+
.maxBackoff(Duration.ofNanos(conf.getMaxBackoffIntervalNanos()))
+ .build();
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
metadataFuture,
new AtomicInteger(0),
metadataAutoCreationEnabled,
useFallbackForNonPIP344Brokers);
@@ -1275,7 +1274,7 @@ public class PulsarClientImpl implements PulsarClient {
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
queryFuture.thenAccept(future::complete).exceptionally(e -> {
remainingTime.addAndGet(-1 *
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
- long nextDelay = Math.min(backoff.next(), remainingTime.get());
+ long nextDelay = Math.min(backoff.next().toMillis(),
remainingTime.get());
// skip retry scheduler when set lookup throttle in client or
server side which will lead to
// `TooManyRequestsException`
boolean isLookupThrottling =
!PulsarClientException.isRetriableError(e.getCause())
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 1b23b6ce2fa..a41d7634583 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
@@ -37,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.common.net.ServiceURI;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
/**
* The default implementation of {@link ServiceNameResolver}.
@@ -220,10 +218,10 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
* @return a new {@link EndpointStatus} instance
*/
private EndpointStatus createEndpointStatus(boolean isAvailable,
InetSocketAddress inetSocketAddress) {
- Backoff backoff = new BackoffBuilder()
- .setInitialTime(serviceUrlQuarantineInitDurationMs,
TimeUnit.MILLISECONDS)
- .setMax(serviceUrlQuarantineMaxDurationMs,
TimeUnit.MILLISECONDS)
- .create();
+ Backoff backoff = Backoff.builder()
+
.initialDelay(Duration.ofMillis(serviceUrlQuarantineInitDurationMs))
+
.maxBackoff(Duration.ofMillis(serviceUrlQuarantineMaxDurationMs))
+ .build();
EndpointStatus endpointStatus =
new EndpointStatus(inetSocketAddress, backoff,
System.currentTimeMillis(), 0,
isAvailable);
@@ -261,13 +259,13 @@ public class PulsarServiceNameResolver implements
ServiceNameResolver {
Duration.ofMillis(elapsedTimeMsSinceLast));
status.setAvailable(true);
status.setLastUpdateTimeStampMs(System.currentTimeMillis());
-
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next().toMillis());
}
} else {
// from available to unavailable
status.setAvailable(false);
status.setLastUpdateTimeStampMs(System.currentTimeMillis());
-
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next().toMillis());
}
} else if (!status.isAvailable()) {
// from unavailable to available
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 0c854a068a2..5f56fee818f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.client.impl;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
@@ -36,7 +36,7 @@ import
org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.topics.TopicsPattern;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,12 +77,11 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
this.patternConsumerUpdateQueue = patternConsumerUpdateQueue;
this.name = "Watcher(" + topicsPattern + ")";
this.connectionHandler = new ConnectionHandler(this,
- new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create(),
+ Backoff.builder()
+
.initialDelay(Duration.ofNanos(client.getConfiguration()
+ .getInitialBackoffIntervalNanos()))
+
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()))
+ .build(),
this);
this.topicsPattern = topicsPattern;
this.watcherId = watcherId;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 4ca742d98ea..c3dbb93c4d6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -27,6 +27,7 @@ import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -48,7 +49,6 @@ import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,11 +105,12 @@ public class TransactionMetaStoreHandler extends
HandlerState
pulsarClient.getConfiguration().getOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
this.connectionHandler = new ConnectionHandler(
this,
- new BackoffBuilder()
-
.setInitialTime(pulsarClient.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
-
.setMax(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(100, TimeUnit.MILLISECONDS)
- .create(),
+ Backoff.builder()
+ .initialDelay(Duration.ofNanos(pulsarClient.getConfiguration()
+ .getInitialBackoffIntervalNanos()))
+
.maxBackoff(Duration.ofNanos(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos()))
+ .mandatoryStop(Duration.ofMillis(100))
+ .build(),
this);
this.connectFuture = connectFuture;
this.internalPinnedExecutor =
pulsarClient.getInternalExecutorService();
@@ -296,7 +297,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
});
}
- , op.backoff.next(), TimeUnit.MILLISECONDS);
+ , op.backoff.next().toMillis(),
TimeUnit.MILLISECONDS);
return;
}
LOG.error("Got {} for request {} error {}",
BaseCommand.Type.NEW_TXN.name(),
@@ -381,7 +382,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
});
}
- , op.backoff.next(), TimeUnit.MILLISECONDS);
+ , op.backoff.next().toMillis(),
TimeUnit.MILLISECONDS);
return;
}
LOG.error("{} for request {}, transaction {}, error: {}",
@@ -478,7 +479,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
});
}
- , op.backoff.next(), TimeUnit.MILLISECONDS);
+ , op.backoff.next().toMillis(),
TimeUnit.MILLISECONDS);
return;
}
LOG.error("{} failed for request {} error {}.",
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(),
@@ -560,7 +561,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
});
}
- , op.backoff.next(), TimeUnit.MILLISECONDS);
+ , op.backoff.next().toMillis(),
TimeUnit.MILLISECONDS);
return;
}
LOG.error("Got {} response for request {}, transaction {},
error: {}",
@@ -604,12 +605,11 @@ public class TransactionMetaStoreHandler extends
HandlerState
OpForTxnIdCallBack op = RECYCLER.get();
op.callback = callback;
op.cmd = cmd;
- op.backoff = new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10,
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create();
+ op.backoff = Backoff.builder()
+ .initialDelay(Duration.ofNanos(client.getConfiguration()
+ .getInitialBackoffIntervalNanos()))
+
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()
/ 10))
+ .build();
op.description = description;
op.clientCnx = clientCnx;
return op;
@@ -646,12 +646,11 @@ public class TransactionMetaStoreHandler extends
HandlerState
OpForVoidCallBack op = RECYCLER.get();
op.callback = callback;
op.cmd = cmd;
- op.backoff = new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10,
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create();
+ op.backoff = Backoff.builder()
+ .initialDelay(Duration.ofNanos(client.getConfiguration()
+ .getInitialBackoffIntervalNanos()))
+
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()
/ 10))
+ .build();
op.description = description;
op.clientCnx = clientCnx;
return op;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 912cb7d7c58..0d262d5b84f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -32,10 +32,10 @@ public class RetryUtil {
public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>>
supplier, Backoff backoff,
ScheduledExecutorService
scheduledExecutorService,
CompletableFuture<T> callback) {
- if (backoff.getMax() <= 0) {
+ if (backoff.getMax().isZero() || backoff.getMax().isNegative()) {
throw new IllegalArgumentException("Illegal max retry time");
}
- if (backoff.getInitial() <= 0) {
+ if (backoff.getInitial().isZero() ||
backoff.getInitial().isNegative()) {
throw new IllegalArgumentException("Illegal initial time");
}
scheduledExecutorService.execute(() ->
@@ -47,7 +47,7 @@ public class RetryUtil {
CompletableFuture<T> callback) {
supplier.get().whenComplete((result, e) -> {
if (e != null) {
- long next = backoff.next();
+ long next = backoff.next().toMillis();
boolean isMandatoryStop = backoff.isMandatoryStopMade();
if (isMandatoryStop) {
callback.completeExceptionally(e);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 86500b2466a..62d6c0b3f7b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -108,9 +108,9 @@ public class ConsumerImplTest {
public void testCorrectBackoffConfiguration() {
final Backoff backoff = consumer.getConnectionHandler().backoff;
ClientConfigurationData clientConfigurationData = new
ClientConfigurationData();
- Assert.assertEquals(backoff.getMax(),
+ Assert.assertEquals(backoff.getMax().toMillis(),
TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getMaxBackoffIntervalNanos()));
- Assert.assertEquals(backoff.next(),
+ Assert.assertEquals(backoff.next().toMillis(),
TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getInitialBackoffIntervalNanos()));
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
index 842d3bbaa97..5957ce86796 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
@@ -19,108 +19,204 @@
package org.apache.pulsar.common.util;
import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import lombok.Data;
+import lombok.Getter;
-// All variables are in TimeUnit millis by default
-@Data
+/**
+ * Exponential backoff with mandatory stop.
+ *
+ * <p>Delays start at {@code initialDelay} and double on every call to {@link
#next()}, up to
+ * {@code maxBackoff}. A random jitter of up to 10% is subtracted from each
value to avoid
+ * thundering-herd retries.
+ *
+ * <p>If a {@code mandatoryStop} duration is configured, the backoff tracks
wall-clock time from the
+ * first {@link #next()} call. Once the elapsed time plus the next delay would
exceed the mandatory
+ * stop, the delay is truncated so that the total does not exceed it, and
{@link #isMandatoryStopMade()}
+ * returns {@code true}. After the mandatory stop, backoff continues to grow
normally.
+ *
+ * <p>Use {@link #reset()} to restart the sequence from the initial delay.
+ *
+ * <pre>{@code
+ * Backoff backoff = Backoff.builder()
+ * .initialDelay(Duration.ofMillis(100))
+ * .maxBackoff(Duration.ofMinutes(1))
+ * .mandatoryStop(Duration.ofSeconds(30))
+ * .build();
+ *
+ * Duration delay = backoff.next();
+ * }</pre>
+ */
public class Backoff {
- public static final long DEFAULT_INTERVAL_IN_NANOSECONDS =
TimeUnit.MILLISECONDS.toNanos(100);
- public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS =
TimeUnit.SECONDS.toNanos(30);
- private final long initial;
- private final long max;
- private final Clock clock;
- private long next;
- private long mandatoryStop;
+ private static final Duration DEFAULT_INITIAL_DELAY =
Duration.ofMillis(100);
+ private static final Duration DEFAULT_MAX_BACKOFF_INTERVAL =
Duration.ofMinutes(1);
+ private static final Random random = new Random();
- private long firstBackoffTimeInMillis;
- private boolean mandatoryStopMade = false;
+ @Getter
+ private final Duration initial;
+ @Getter
+ private final Duration max;
+ @Getter
+ private final Duration mandatoryStop;
+ private final Clock clock;
- private static final Random random = new Random();
+ private Duration next;
+ @Getter
+ private Instant firstBackoffTime;
+ @Getter
+ private boolean mandatoryStopMade;
- Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax,
long mandatoryStop,
- TimeUnit unitMandatoryStop, Clock clock) {
- this.initial = unitInitial.toMillis(initial);
- this.max = unitMax.toMillis(max);
- if (initial == 0 && max == 0 && mandatoryStop == 0) {
+ private Backoff(Duration initial, Duration max, Duration mandatoryStop,
Clock clock) {
+ this.initial = initial;
+ this.max = max;
+ this.mandatoryStop = mandatoryStop;
+ this.next = initial;
+ this.clock = clock;
+ this.firstBackoffTime = Instant.EPOCH;
+ if (initial.isZero() && max.isZero() && mandatoryStop.isZero()) {
this.mandatoryStopMade = true;
}
- this.next = this.initial;
- this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
- this.clock = clock;
- this.firstBackoffTimeInMillis = 0;
}
- public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit
unitMax, long mandatoryStop,
- TimeUnit unitMandatoryStop) {
- this(initial, unitInitial, max, unitMax, mandatoryStop,
unitMandatoryStop, Clock.systemDefaultZone());
+ /**
+ * Creates a {@link Backoff} with the default configuration (initial delay
100 ms, max 1 min,
+ * no mandatory stop).
+ *
+ * @return a new Backoff with default settings
+ */
+ public static Backoff create() {
+ return new Builder().build();
}
- public long next() {
- long current = this.next;
- if (current < max) {
- this.next = Math.min(this.next * 2, this.max);
+ /**
+ * Creates a new {@link Builder} with default settings.
+ *
+ * @return a new builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Returns the next backoff delay, advancing the internal state.
+ *
+ * <p>The returned duration is never less than the initial delay and never
more than the max
+ * backoff. A random jitter of up to 10% is subtracted to spread out
concurrent retries.
+ *
+ * @return the delay to wait before the next retry attempt
+ */
+ public Duration next() {
+ Duration current = this.next;
+ if (current.compareTo(max) < 0) {
+ Duration doubled = this.next.multipliedBy(2);
+ this.next = doubled.compareTo(this.max) < 0 ? doubled : this.max;
}
// Check for mandatory stop
if (!mandatoryStopMade) {
- long now = clock.millis();
- long timeElapsedSinceFirstBackoff = 0;
- if (initial == current) {
- firstBackoffTimeInMillis = now;
+ Instant now = clock.instant();
+ Duration timeElapsedSinceFirstBackoff = Duration.ZERO;
+ if (initial.equals(current)) {
+ firstBackoffTime = now;
} else {
- timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis;
+ timeElapsedSinceFirstBackoff =
Duration.between(firstBackoffTime, now);
}
- if (timeElapsedSinceFirstBackoff + current > mandatoryStop) {
- current = Math.max(initial, mandatoryStop -
timeElapsedSinceFirstBackoff);
+ if
(timeElapsedSinceFirstBackoff.plus(current).compareTo(mandatoryStop) > 0) {
+ Duration remaining =
mandatoryStop.minus(timeElapsedSinceFirstBackoff);
+ current = remaining.compareTo(initial) > 0 ? remaining :
initial;
mandatoryStopMade = true;
}
}
// Randomly decrease the timeout up to 10% to avoid simultaneous
retries
- // If current < 10 then current/10 < 1 and we get an exception from
Random saying "Bound must be positive"
- if (current > 10) {
- current -= random.nextInt((int) current / 10);
+ long currentMillis = current.toMillis();
+ if (currentMillis > 10) {
+ currentMillis -= random.nextInt((int) currentMillis / 10);
}
- return Math.max(initial, current);
+ long initialMillis = initial.toMillis();
+ return Duration.ofMillis(Math.max(initialMillis, currentMillis));
}
+ /**
+ * Halves the next delay (but never below the initial delay).
+ * Useful after a partially successful operation to converge faster.
+ */
public void reduceToHalf() {
- if (next > initial) {
- this.next = Math.max(this.next / 2, this.initial);
+ if (next.compareTo(initial) > 0) {
+ Duration half = next.dividedBy(2);
+ this.next = half.compareTo(initial) > 0 ? half : initial;
}
}
+ /**
+ * Resets the backoff to its initial state so the next call to {@link
#next()} returns the
+ * initial delay again. Also resets the mandatory-stop tracking.
+ */
public void reset() {
this.next = this.initial;
- if (initial == 0 && max == 0 && mandatoryStop == 0) {
- this.mandatoryStopMade = true;
- } else {
- this.mandatoryStopMade = false;
- }
+ this.mandatoryStopMade = initial.isZero() && max.isZero() &&
mandatoryStop.isZero();
}
- public static boolean shouldBackoff(long initialTimestamp, TimeUnit
unitInitial, int failedAttempts,
- long defaultInterval, long
maxBackoffInterval) {
- long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
- long currentTime = System.nanoTime();
- long interval = defaultInterval;
- for (int i = 1; i < failedAttempts; i++) {
- interval = interval * 2;
- if (interval > maxBackoffInterval) {
- interval = maxBackoffInterval;
- break;
- }
+ /**
+ * Builder for {@link Backoff}.
+ *
+ * <p>Defaults: initial delay 100 ms, max backoff 1 min, no mandatory stop.
+ */
+ public static class Builder {
+ private Duration initialDelay = DEFAULT_INITIAL_DELAY;
+ private Duration maxBackoff = DEFAULT_MAX_BACKOFF_INTERVAL;
+ private Duration mandatoryStop = Duration.ZERO;
+ private Clock clock = Clock.systemDefaultZone();
+
+ /**
+ * Sets the initial (smallest) backoff delay. Defaults to 100 ms.
+ *
+ * @param initialDelay the initial delay
+ * @return this builder
+ */
+ public Builder initialDelay(Duration initialDelay) {
+ this.initialDelay = initialDelay;
+ return this;
}
- // if the current time is less than the time at which next retry
should occur, we should backoff
- return currentTime < (initialTimestampInNano + interval);
- }
+ /**
+ * Sets the upper bound for the backoff delay. Defaults to 1 min.
+ *
+ * @param maxBackoff the maximum delay
+ * @return this builder
+ */
+ public Builder maxBackoff(Duration maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ return this;
+ }
+
+ /**
+ * Sets the mandatory-stop deadline measured from the first {@link
Backoff#next()} call.
+ * Once wall-clock time exceeds this duration the current delay is
truncated and
+ * {@link Backoff#isMandatoryStopMade()} returns {@code true}.
Defaults to zero (disabled).
+ *
+ * @param mandatoryStop the mandatory stop duration
+ * @return this builder
+ */
+ public Builder mandatoryStop(Duration mandatoryStop) {
+ this.mandatoryStop = mandatoryStop;
+ return this;
+ }
- public static boolean shouldBackoff(long initialTimestamp, TimeUnit
unitInitial, int failedAttempts) {
- return Backoff.shouldBackoff(initialTimestamp, unitInitial,
failedAttempts,
- DEFAULT_INTERVAL_IN_NANOSECONDS,
MAX_BACKOFF_INTERVAL_NANOSECONDS);
+ Builder clock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ /**
+ * Builds a new {@link Backoff} instance with the configured
parameters.
+ *
+ * @return a new Backoff
+ */
+ public Backoff build() {
+ return new Backoff(initialDelay, maxBackoff, mandatoryStop, clock);
+ }
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
deleted file mode 100644
index 69b39030081..00000000000
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util;
-
-import java.time.Clock;
-import java.util.concurrent.TimeUnit;
-
-public class BackoffBuilder {
- private long initial;
- private TimeUnit unitInitial;
- private long max;
- private TimeUnit unitMax;
- private Clock clock;
- private long mandatoryStop;
- private TimeUnit unitMandatoryStop;
-
- public BackoffBuilder() {
- this.initial = 0;
- this.unitInitial = TimeUnit.MILLISECONDS;
- this.max = 0;
- this.unitMax = TimeUnit.MILLISECONDS;
- this.mandatoryStop = 0;
- this.unitMandatoryStop = TimeUnit.MILLISECONDS;
- this.clock = Clock.systemDefaultZone();
- }
-
- public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
- this.unitInitial = unitInitial;
- this.initial = initial;
- return this;
- }
-
- public BackoffBuilder setMax(long max, TimeUnit unitMax) {
- this.unitMax = unitMax;
- this.max = max;
- return this;
- }
-
- public BackoffBuilder setMandatoryStop(long mandatoryStop, TimeUnit
unitMandatoryStop) {
- this.mandatoryStop = mandatoryStop;
- this.unitMandatoryStop = unitMandatoryStop;
- return this;
- }
-
-
- public Backoff create() {
- return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop,
unitMandatoryStop, clock);
- }
-}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
index 9ca9503b5c0..152ddee5156 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
@@ -22,35 +22,31 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
-import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;
import org.testng.annotations.Test;
public class BackoffTest {
boolean withinTenPercentAndDecrementTimer(Backoff backoff, long t2) {
- long t1 = backoff.next();
+ long t1 = backoff.next().toMillis();
return (t1 >= t2 * 0.9 && t1 <= t2);
}
boolean checkExactAndDecrementTimer(Backoff backoff, long t2) {
- long t1 = backoff.next();
+ long t1 = backoff.next().toMillis();
return t1 == t2;
}
- @Test
- public void shouldBackoffTest() {
- // gives false
- assertFalse(Backoff.shouldBackoff(0L, TimeUnit.NANOSECONDS, 0));
- long currentTimestamp = System.nanoTime();
- // gives true
- assertTrue(Backoff.shouldBackoff(currentTimestamp,
TimeUnit.NANOSECONDS, 100));
- }
@Test
public void mandatoryStopTestNegativeTest() {
- Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60,
TimeUnit.SECONDS, 1900, TimeUnit.MILLISECONDS);
- assertEquals(backoff.next(), 100);
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(60))
+ .mandatoryStop(Duration.ofMillis(1900))
+ .build();
+ assertEquals(backoff.next().toMillis(), 100);
backoff.next(); // 200
backoff.next(); // 400
backoff.next(); // 800
@@ -60,30 +56,35 @@ public class BackoffTest {
@Test
public void firstBackoffTimerTest() {
Clock mockClock = Mockito.mock(Clock.class);
- Mockito.when(mockClock.millis())
- .thenReturn(0L)
- .thenReturn(300L);
+ Mockito.when(mockClock.instant())
+ .thenReturn(Instant.ofEpochMilli(0))
+ .thenReturn(Instant.ofEpochMilli(300));
- Backoff backoff = new Backoff(
- 100, TimeUnit.MILLISECONDS,
- 60, TimeUnit.SECONDS,
- 1900, TimeUnit.MILLISECONDS,
- mockClock
- );
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(60))
+ .mandatoryStop(Duration.ofMillis(1900))
+ .clock(mockClock)
+ .build();
- assertEquals(backoff.next(), 100);
+ assertEquals(backoff.next().toMillis(), 100);
- long firstBackOffTime = backoff.getFirstBackoffTimeInMillis();
+ Instant firstBackOffTime = backoff.getFirstBackoffTime();
backoff.reset();
- assertEquals(backoff.next(), 100);
- long diffBackOffTime = backoff.getFirstBackoffTimeInMillis() -
firstBackOffTime;
+ assertEquals(backoff.next().toMillis(), 100);
+ long diffBackOffTime = Duration.between(firstBackOffTime,
backoff.getFirstBackoffTime()).toMillis();
assertEquals(diffBackOffTime, 300);
}
@Test
public void basicTest() {
Clock mockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());
- Backoff backoff = new Backoff(5, TimeUnit.MILLISECONDS, 60,
TimeUnit.SECONDS, 60, TimeUnit.SECONDS, mockClock);
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(5))
+ .maxBackoff(Duration.ofSeconds(60))
+ .mandatoryStop(Duration.ofSeconds(60))
+ .clock(mockClock)
+ .build();
assertTrue(checkExactAndDecrementTimer(backoff, 5));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 10));
backoff.reset();
@@ -93,18 +94,18 @@ public class BackoffTest {
@Test
public void maxTest() {
Clock mockClock = Mockito.mock(Clock.class);
- Mockito.when(mockClock.millis())
- .thenReturn(0L)
- .thenReturn(10L)
- .thenReturn(20L)
- .thenReturn(40L);
-
- Backoff backoff = new Backoff(
- 5, TimeUnit.MILLISECONDS,
- 20, TimeUnit.MILLISECONDS,
- 20, TimeUnit.MILLISECONDS,
- mockClock
- );
+ Mockito.when(mockClock.instant())
+ .thenReturn(Instant.ofEpochMilli(0))
+ .thenReturn(Instant.ofEpochMilli(10))
+ .thenReturn(Instant.ofEpochMilli(20))
+ .thenReturn(Instant.ofEpochMilli(40));
+
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(5))
+ .maxBackoff(Duration.ofMillis(20))
+ .mandatoryStop(Duration.ofMillis(20))
+ .clock(mockClock)
+ .build();
assertTrue(checkExactAndDecrementTimer(backoff, 5));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 10));
@@ -116,73 +117,73 @@ public class BackoffTest {
public void mandatoryStopTest() {
Clock mockClock = Mockito.mock(Clock.class);
- Backoff backoff = new Backoff(
- 100, TimeUnit.MILLISECONDS,
- 60, TimeUnit.SECONDS,
- 1900, TimeUnit.MILLISECONDS,
- mockClock
- );
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(60))
+ .mandatoryStop(Duration.ofMillis(1900))
+ .clock(mockClock)
+ .build();
- Mockito.when(mockClock.millis()).thenReturn(0L);
+ Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
assertTrue(checkExactAndDecrementTimer(backoff, 100));
- Mockito.when(mockClock.millis()).thenReturn(100L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
- Mockito.when(mockClock.millis()).thenReturn(300L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
- Mockito.when(mockClock.millis()).thenReturn(700L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
- Mockito.when(mockClock.millis()).thenReturn(1500L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500));
// would have been 1600 w/o the mandatory stop
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
assertTrue(backoff.isMandatoryStopMade());
- Mockito.when(mockClock.millis()).thenReturn(1900L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1900));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200));
- Mockito.when(mockClock.millis()).thenReturn(3200L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400));
- Mockito.when(mockClock.millis()).thenReturn(3200L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800));
- Mockito.when(mockClock.millis()).thenReturn(6400L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(6400));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600));
- Mockito.when(mockClock.millis()).thenReturn(12800L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(12800));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200));
- Mockito.when(mockClock.millis()).thenReturn(25600L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(25600));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000));
- Mockito.when(mockClock.millis()).thenReturn(51200L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(51200));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000));
- Mockito.when(mockClock.millis()).thenReturn(60000L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(60000));
backoff.reset();
- Mockito.when(mockClock.millis()).thenReturn(0L);
+ Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
assertTrue(checkExactAndDecrementTimer(backoff, 100));
- Mockito.when(mockClock.millis()).thenReturn(100L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
- Mockito.when(mockClock.millis()).thenReturn(300L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
- Mockito.when(mockClock.millis()).thenReturn(700L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
- Mockito.when(mockClock.millis()).thenReturn(1500L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500));
// would have been 1600 w/o the mandatory stop
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
backoff.reset();
- Mockito.when(mockClock.millis()).thenReturn(0L);
+ Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
assertTrue(checkExactAndDecrementTimer(backoff, 100));
- Mockito.when(mockClock.millis()).thenReturn(100L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
- Mockito.when(mockClock.millis()).thenReturn(300L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
- Mockito.when(mockClock.millis()).thenReturn(700L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
backoff.reset();
- Mockito.when(mockClock.millis()).thenReturn(0L);
+ Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
assertTrue(checkExactAndDecrementTimer(backoff, 100));
- Mockito.when(mockClock.millis()).thenReturn(100L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
- Mockito.when(mockClock.millis()).thenReturn(300L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
- Mockito.when(mockClock.millis()).thenReturn(700L);
+
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
index e5334038290..e797850f4f6 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
@@ -18,13 +18,13 @@
*/
package org.apache.pulsar.metadata.api;
+import java.time.Duration;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
/**
* The configuration builder for a {@link MetadataCache} config.
@@ -33,16 +33,16 @@ import org.apache.pulsar.common.util.BackoffBuilder;
@Getter
@ToString
public class MetadataCacheConfig<T> {
- private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS =
TimeUnit.MINUTES.toMillis(5);
- public static final BackoffBuilder DEFAULT_RETRY_BACKOFF_BUILDER =
- new BackoffBuilder().setInitialTime(5, TimeUnit.MILLISECONDS)
- .setMax(3, TimeUnit.SECONDS)
- .setMandatoryStop(30, TimeUnit.SECONDS);
+ private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS =
Duration.ofMinutes(5).toMillis();
+ public static final Backoff.Builder DEFAULT_RETRY_BACKOFF_BUILDER =
+ Backoff.builder().initialDelay(Duration.ofMillis(5))
+ .maxBackoff(Duration.ofSeconds(3))
+ .mandatoryStop(Duration.ofSeconds(30));
- public static final BackoffBuilder NO_RETRY_BACKOFF_BUILDER =
- new BackoffBuilder().setInitialTime(0, TimeUnit.MILLISECONDS)
- .setMax(0, TimeUnit.SECONDS)
- .setMandatoryStop(0, TimeUnit.SECONDS);
+ public static final Backoff.Builder NO_RETRY_BACKOFF_BUILDER =
+ Backoff.builder().initialDelay(Duration.ZERO)
+ .maxBackoff(Duration.ZERO)
+ .mandatoryStop(Duration.ZERO);
/**
* Specifies that active entries are eligible for automatic refresh once a
fixed duration has
@@ -68,6 +68,6 @@ public class MetadataCacheConfig<T> {
private final BiConsumer<String, Optional<CacheGetResult<T>>>
asyncReloadConsumer = null;
@Builder.Default
- private final BackoffBuilder retryBackoff = DEFAULT_RETRY_BACKOFF_BUILDER;
+ private final Backoff.Builder retryBackoff = DEFAULT_RETRY_BACKOFF_BUILDER;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 25890852775..e27f9338f54 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -25,6 +25,8 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
@@ -373,9 +375,9 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
// if resource is updated by other than metadata-cache then
metadata-cache will get bad-version
// exception. so, try to invalidate the cache and try one more
time.
objCache.synchronous().invalidate(key);
- long elapsed = System.currentTimeMillis() -
backoff.getFirstBackoffTimeInMillis();
+ long elapsed = Duration.between(backoff.getFirstBackoffTime(),
Instant.now()).toMillis();
if (backoff.isMandatoryStopMade()) {
- if (backoff.getFirstBackoffTimeInMillis() == 0) {
+ if (Instant.EPOCH.equals(backoff.getFirstBackoffTime())) {
result.completeExceptionally(ex.getCause());
} else {
result.completeExceptionally(new TimeoutException(
@@ -383,10 +385,10 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
return null;
}
- final var next = backoff.next();
+ final long nextMs = backoff.next().toMillis();
log.info("Update key {} conflicts. Retrying in {} ms.
Mandatory stop: {}. Elapsed time: {} ms", key,
- next, backoff.isMandatoryStopMade(), elapsed);
- schedulerExecutor.schedule(() -> execute(op, key, result,
backoff), next, TimeUnit.MILLISECONDS);
+ nextMs, backoff.isMandatoryStopMade(), elapsed);
+ schedulerExecutor.schedule(() -> execute(op, key, result,
backoff), nextMs, TimeUnit.MILLISECONDS);
return null;
}
result.completeExceptionally(ex.getCause());
@@ -395,7 +397,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
private CompletableFuture<T>
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
- final var backoff = cacheConfig.getRetryBackoff().create();
+ final var backoff = cacheConfig.getRetryBackoff().build();
CompletableFuture<T> result = new CompletableFuture<>();
execute(op, key, result, backoff);
return result;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 692f224594c..2fe766a706a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -73,10 +72,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
this.sequencer = FutureUtil.Sequencer.create();
this.state = State.Init;
this.executor = executor;
- this.backoff = new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMax(60, TimeUnit.SECONDS)
- .create();
+ this.backoff = Backoff.create();
}
@Override
@@ -251,7 +247,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T>
{
// on Reconnected or SessionReestablished events.
revalidateAfterReconnection = true;
- long delayMillis = backoff.next();
+ long delayMillis = backoff.next().toMillis();
log.warn("Failed to revalidate the lock at {}: {}
- Retrying in {} seconds", path,
realCause.getMessage(), delayMillis /
1000.0);
revalidateTask =
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index d188b63ba15..69e23775369 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -52,7 +53,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.CacheGetResult;
@@ -524,10 +525,10 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class,
MetadataCacheConfig.builder()
- .retryBackoff(new BackoffBuilder()
- .setInitialTime(5, TimeUnit.MILLISECONDS)
- .setMax(1, TimeUnit.SECONDS)
- .setMandatoryStop(3, TimeUnit.SECONDS)).build());
+ .retryBackoff(Backoff.builder()
+ .initialDelay(Duration.ofMillis(5))
+ .maxBackoff(Duration.ofSeconds(1))
+ .mandatoryStop(Duration.ofSeconds(3))).build());
MetadataCache<MyClass> cacheRef = cache;
if (cache instanceof DualMetadataCache dc) {
@@ -704,10 +705,10 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
final var config = MetadataCacheConfig.builder().build();
assertEquals(config.getRefreshAfterWriteMillis(),
TimeUnit.MINUTES.toMillis(5));
assertEquals(config.getExpireAfterWriteMillis(),
TimeUnit.MINUTES.toMillis(10));
- final var backoff = config.getRetryBackoff().create();
- assertEquals(backoff.getInitial(), 5);
- assertEquals(backoff.getMax(), 3000);
- assertEquals(backoff.getMandatoryStop(), 30_000);
+ final var backoff = config.getRetryBackoff().build();
+ assertEquals(backoff.getInitial(), Duration.ofMillis(5));
+ assertEquals(backoff.getMax(), Duration.ofSeconds(3));
+ assertEquals(backoff.getMandatoryStop(), Duration.ofSeconds(30));
}
@Test
@@ -715,36 +716,36 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
final var config = MetadataCacheConfig.builder().retryBackoff(
MetadataCacheConfig.NO_RETRY_BACKOFF_BUILDER).build();
- final var backoff = config.getRetryBackoff().create();
+ final var backoff = config.getRetryBackoff().build();
- assertEquals(backoff.getInitial(), 0);
- assertEquals(backoff.getMax(), 0);
- assertEquals(backoff.getMandatoryStop(), 0);
+ assertEquals(backoff.getInitial(), Duration.ZERO);
+ assertEquals(backoff.getMax(), Duration.ZERO);
+ assertEquals(backoff.getMandatoryStop(), Duration.ZERO);
assertTrue(backoff.isMandatoryStopMade());
- assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
- assertEquals(backoff.next(), 0);
- assertEquals(backoff.next(), 0);
- assertEquals(backoff.next(), 0);
+ assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
+ assertEquals(backoff.next(), Duration.ZERO);
+ assertEquals(backoff.next(), Duration.ZERO);
+ assertEquals(backoff.next(), Duration.ZERO);
assertTrue(backoff.isMandatoryStopMade());
- assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
+ assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
backoff.reduceToHalf();
assertTrue(backoff.isMandatoryStopMade());
- assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
- assertEquals(backoff.next(), 0);
- assertEquals(backoff.next(), 0);
- assertEquals(backoff.next(), 0);
+ assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
+ assertEquals(backoff.next(), Duration.ZERO);
+ assertEquals(backoff.next(), Duration.ZERO);
+ assertEquals(backoff.next(), Duration.ZERO);
assertTrue(backoff.isMandatoryStopMade());
- assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
+ assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
backoff.reset();
assertTrue(backoff.isMandatoryStopMade());
- assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
- assertEquals(backoff.next(), 0);
- assertEquals(backoff.next(), 0);
- assertEquals(backoff.next(), 0);
+ assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
+ assertEquals(backoff.next(), Duration.ZERO);
+ assertEquals(backoff.next(), Duration.ZERO);
+ assertEquals(backoff.next(), Duration.ZERO);
assertTrue(backoff.isMandatoryStopMade());
- assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
+ assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
}
@Test