This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 27df79edd0f [improve][broker] Extract duplication in
AbstractTopic#incrementTopicEpochIfNeeded (#24520)
27df79edd0f is described below
commit 27df79edd0ffdf105c6b07220be90be96361ff07
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Jul 29 16:16:01 2025 +0800
[improve][broker] Extract duplication in
AbstractTopic#incrementTopicEpochIfNeeded (#24520)
---
.../pulsar/broker/service/AbstractTopic.java | 154 +++++++--------------
1 file changed, 51 insertions(+), 103 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 5856b530bcb..e8253771ede 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -814,84 +814,28 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
"Topic has an existing exclusive producer:
" + exclusiveProducerName));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new
ProducerFencedException("Topic has existing shared producers"));
- } else if (producer.getTopicEpoch().isPresent()
- && producer.getTopicEpoch().get() <
topicEpoch.orElse(-1L)) {
- // If a producer reconnects, but all the topic epoch has
already moved forward, this producer needs
- // to be fenced, because a new producer had been present
in between.
- return FutureUtil.failedFuture(new ProducerFencedException(
- String.format("Topic epoch has already moved.
Current epoch: %d, Producer epoch: %d",
- topicEpoch.get(),
producer.getTopicEpoch().get())));
- } else {
- // There are currently no existing producers
- hasExclusiveProducer = true;
- exclusiveProducerName = producer.getProducerName();
-
- CompletableFuture<Long> future;
- if (producer.getTopicEpoch().isPresent()) {
- future = setTopicEpoch(producer.getTopicEpoch().get());
- } else {
- future = incrementTopicEpoch(topicEpoch);
- }
- future.exceptionally(ex -> {
- hasExclusiveProducer = false;
- exclusiveProducerName = null;
- return null;
+ }
+ return handleTopicEpochForExclusiveProducer(producer);
+ case ExclusiveWithFencing:
+ if (hasExclusiveProducer || !producers.isEmpty()) {
+ // clear all waiting producers
+ // otherwise closing any producer will trigger the
promotion
+ // of the next pending producer
+ List<Pair<Producer, CompletableFuture<Optional<Long>>>>
waitingExclusiveProducersCopy =
+ new ArrayList<>(waitingExclusiveProducers);
+ waitingExclusiveProducers.clear();
+ waitingExclusiveProducersCopy.forEach((Pair<Producer,
+
CompletableFuture<Optional<Long>>> handle) -> {
+ log.info("[{}] Failing waiting producer {}", topic,
handle.getKey());
+ handle.getValue().completeExceptionally(new
ProducerFencedException("Fenced out"));
+ handle.getKey().close(true);
});
-
- return future.thenApply(epoch -> {
- topicEpoch = Optional.of(epoch);
- return topicEpoch;
+ producers.forEach((k, currentProducer) -> {
+ log.info("[{}] Fencing out producer {}", topic,
currentProducer);
+ currentProducer.close(true);
});
}
- case ExclusiveWithFencing:
- if (hasExclusiveProducer || !producers.isEmpty()) {
- // clear all waiting producers
- // otherwise closing any producer will trigger the
promotion
- // of the next pending producer
- List<Pair<Producer,
CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy =
- new ArrayList<>(waitingExclusiveProducers);
- waitingExclusiveProducers.clear();
- waitingExclusiveProducersCopy.forEach((Pair<Producer,
-
CompletableFuture<Optional<Long>>> handle) -> {
- log.info("[{}] Failing waiting producer {}",
topic, handle.getKey());
- handle.getValue().completeExceptionally(new
ProducerFencedException("Fenced out"));
- handle.getKey().close(true);
- });
- producers.forEach((k, currentProducer) -> {
- log.info("[{}] Fencing out producer {}", topic,
currentProducer);
- currentProducer.close(true);
- });
- }
- if (producer.getTopicEpoch().isPresent()
- && producer.getTopicEpoch().get() <
topicEpoch.orElse(-1L)) {
- // If a producer reconnects, but all the topic epoch
has already moved forward,
- // this producer needs to be fenced, because a new
producer had been present in between.
- hasExclusiveProducer = false;
- return FutureUtil.failedFuture(new
ProducerFencedException(
- String.format("Topic epoch has already moved.
Current epoch: %d, Producer epoch: %d",
- topicEpoch.get(),
producer.getTopicEpoch().get())));
- } else {
- // There are currently no existing producers
- hasExclusiveProducer = true;
- exclusiveProducerName = producer.getProducerName();
-
- CompletableFuture<Long> future;
- if (producer.getTopicEpoch().isPresent()) {
- future =
setTopicEpoch(producer.getTopicEpoch().get());
- } else {
- future = incrementTopicEpoch(topicEpoch);
- }
- future.exceptionally(ex -> {
- hasExclusiveProducer = false;
- exclusiveProducerName = null;
- return null;
- });
-
- return future.thenApply(epoch -> {
- topicEpoch = Optional.of(epoch);
- return topicEpoch;
- });
- }
+ return handleTopicEpochForExclusiveProducer(producer);
case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
CompletableFuture<Optional<Long>> future = new
CompletableFuture<>();
@@ -899,35 +843,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
waitingExclusiveProducers.add(Pair.of(producer, future));
producerQueuedFuture.complete(null);
return future;
- } else if (producer.getTopicEpoch().isPresent()
- && producer.getTopicEpoch().get() <
topicEpoch.orElse(-1L)) {
- // If a producer reconnects, but all the topic epoch has
already moved forward, this producer needs
- // to be fenced, because a new producer had been present
in between.
- return FutureUtil.failedFuture(new ProducerFencedException(
- String.format("Topic epoch has already moved.
Current epoch: %d, Producer epoch: %d",
- topicEpoch.get(),
producer.getTopicEpoch().get())));
- } else {
- // There are currently no existing producers
- hasExclusiveProducer = true;
- exclusiveProducerName = producer.getProducerName();
-
- CompletableFuture<Long> future;
- if (producer.getTopicEpoch().isPresent()) {
- future = setTopicEpoch(producer.getTopicEpoch().get());
- } else {
- future = incrementTopicEpoch(topicEpoch);
- }
- future.exceptionally(ex -> {
- hasExclusiveProducer = false;
- exclusiveProducerName = null;
- return null;
- });
-
- return future.thenApply(epoch -> {
- topicEpoch = Optional.of(epoch);
- return topicEpoch;
- });
}
+ return handleTopicEpochForExclusiveProducer(producer);
}
default:
@@ -943,6 +860,37 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
}
}
+ private CompletableFuture<Optional<Long>>
handleTopicEpochForExclusiveProducer(Producer producer) {
+ if (producer.getTopicEpoch().isPresent()
+ && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
+ // If a producer reconnects, but all the topic epoch has already
moved forward, this producer needs
+ // to be fenced, because a new producer had been present in
between.
+ return FutureUtil.failedFuture(new ProducerFencedException(
+ String.format("Topic epoch has already moved. Current
epoch: %d, Producer epoch: %d",
+ topicEpoch.get(),
producer.getTopicEpoch().get())));
+ }
+ // There are currently no existing producers
+ hasExclusiveProducer = true;
+ exclusiveProducerName = producer.getProducerName();
+
+ CompletableFuture<Long> future;
+ if (producer.getTopicEpoch().isPresent()) {
+ future = setTopicEpoch(producer.getTopicEpoch().get());
+ } else {
+ future = incrementTopicEpoch(topicEpoch);
+ }
+ future.exceptionally(ex -> {
+ hasExclusiveProducer = false;
+ exclusiveProducerName = null;
+ return null;
+ });
+
+ return future.thenApply(epoch -> {
+ topicEpoch = Optional.of(epoch);
+ return topicEpoch;
+ });
+ }
+
protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch);
protected abstract CompletableFuture<Long>
incrementTopicEpoch(Optional<Long> currentEpoch);