This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35dae97c0bf [fix][broker] Return failed future instead of throwing
exception in async methods (#25289)
35dae97c0bf is described below
commit 35dae97c0bf5e3810d92ba12922e98a85d9634ec
Author: Hao Zhang <[email protected]>
AuthorDate: Tue Mar 10 01:04:10 2026 +0800
[fix][broker] Return failed future instead of throwing exception in async
methods (#25289)
Co-authored-by: 张浩 <[email protected]>
---
.../authorization/PulsarAuthorizationProvider.java | 2 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 4 +--
.../pulsar/broker/admin/impl/NamespacesBase.java | 5 +--
.../broker/admin/impl/PersistentTopicsBase.java | 37 ++++++++++++----------
.../pulsar/broker/delayed/bucket/Bucket.java | 5 +--
.../channel/ServiceUnitStateChannelImpl.java | 2 +-
.../pulsar/broker/namespace/NamespaceService.java | 16 +++++++---
.../pulsar/broker/service/BrokerService.java | 22 +++++++++----
.../SystemTopicBasedTopicPoliciesService.java | 4 ++-
.../org/apache/pulsar/broker/service/Topic.java | 4 ++-
.../service/nonpersistent/NonPersistentTopic.java | 3 +-
.../prometheus/PrometheusMetricsGenerator.java | 3 +-
.../pulsar/broker/systopic/SystemTopicClient.java | 3 +-
.../pulsar/broker/web/PulsarWebResource.java | 6 +++-
.../compaction/PulsarCompactionServiceFactory.java | 10 ++++--
.../compaction/StrategicTwoPhaseCompactor.java | 2 +-
.../client/impl/PulsarChannelInitializer.java | 12 ++++---
.../org/apache/pulsar/common/util/FutureUtil.java | 21 ++++++++----
.../pulsar/common/util/netty/ChannelFutures.java | 6 ++--
.../pulsar/common/util/netty/NettyFutureUtil.java | 10 ++++--
.../common/util/netty/ChannelFuturesTest.java | 12 +++++--
21 files changed, 125 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 976e7b7ee12..e4f2ac7e8cd 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -326,7 +326,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
boolean sameNamespace = namespaces.distinct().count() == 1;
if (!sameNamespace) {
- throw new IllegalArgumentException("The namespace should be the
same");
+ return FutureUtil.failedFuture(new IllegalArgumentException("The
namespace should be the same"));
}
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 1e4e4ff66dd..3ee2a1285d3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -420,7 +420,8 @@ public class BrokersBase extends AdminResource {
private CompletableFuture<Void>
internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
- throw new RestException(Status.PRECONDITION_FAILED, "Can't delete
non-dynamic configuration");
+ return FutureUtil.failedFuture(
+ new RestException(Status.PRECONDITION_FAILED, "Can't
delete non-dynamic configuration"));
} else {
return
dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
if (old != null) {
@@ -536,4 +537,3 @@ public class BrokersBase extends AdminResource {
return CompletableFuture.completedFuture(null);
}
}
-
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0c18d49ebea..dc173ddac40 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -748,7 +748,8 @@ public abstract class NamespacesBase extends AdminResource {
private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
boolean sameNamespace = namespaces.distinct().count() == 1;
if (!sameNamespace) {
- throw new RestException(Status.BAD_REQUEST, "The namespace should
be the same");
+ return FutureUtil.failedFuture(
+ new RestException(Status.BAD_REQUEST, "The namespace
should be the same"));
}
return CompletableFuture.completedFuture(null);
}
@@ -1971,7 +1972,7 @@ public abstract class NamespacesBase extends
AdminResource {
+ " Repl clusters: %s, allowed clusters: %s",
ns.toString(), policies.replication_clusters,
policies.allowed_clusters);
log.info(msg);
- throw new RestException(Status.BAD_REQUEST, msg);
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, msg));
}
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns,
policies);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 76b3247c599..be98f7102b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1074,8 +1074,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void>
internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNumToSet,
boolean isGlobal) {
if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "maxUnackedNum must be 0 or more");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "maxUnackedNum must be 0 or more"));
}
return pulsar().getTopicPoliciesService()
@@ -1099,8 +1099,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void>
internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNumToSet,
boolean isGlobal) {
if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "maxUnackedNum must be 0 or more");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "maxUnackedNum must be 0 or more"));
}
return pulsar().getTopicPoliciesService()
@@ -1112,7 +1112,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void>
internalSetDeduplicationSnapshotInterval(Integer intervalToSet,
boolean isGlobal) {
if (intervalToSet != null && intervalToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED, "interval must
be 0 or more");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "interval must be 0 or more"));
}
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, isGlobal, intervalToSet
== null, policies -> {
@@ -3643,9 +3644,9 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetMaxMessageSize(Integer
maxMessageSizeToSet, boolean isGlobal) {
if (maxMessageSizeToSet != null && (maxMessageSizeToSet < 0
|| maxMessageSizeToSet > config().getMaxMessageSize())) {
- throw new RestException(Status.PRECONDITION_FAILED
- , "topic-level maxMessageSize must be greater than or
equal to 0 "
- + "and must be smaller than that in the broker-level");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "topic-level maxMessageSize must be greater than or equal
to 0 "
+ + "and must be smaller than that in the
broker-level"));
}
return pulsar().getTopicPoliciesService()
@@ -3673,8 +3674,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetMaxProducers(Integer
maxProducersToSet, boolean isGlobal) {
if (maxProducersToSet != null && maxProducersToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "maxProducers must be 0 or more");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "maxProducers must be 0 or more"));
}
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, isGlobal,
maxProducersToSet == null, policies -> {
@@ -3690,8 +3691,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void>
internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsToSet,
boolean isGlobal) {
if (maxSubscriptionsToSet != null && maxSubscriptionsToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "maxSubscriptionsPerTopic must be 0 or more");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "maxSubscriptionsPerTopic must be 0 or more"));
}
return pulsar().getTopicPoliciesService()
@@ -3776,8 +3777,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetMaxConsumers(Integer
maxConsumersToSet, boolean isGlobal) {
if (maxConsumersToSet != null && maxConsumersToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "maxConsumers must be 0 or more");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "maxConsumers must be 0 or more"));
}
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, isGlobal,
maxConsumersToSet == null, policies -> {
@@ -4678,7 +4679,7 @@ public class PersistentTopicsBase extends AdminResource {
futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to trim topic {}", clientAppId(),
topicNamePartition, e);
- throw new RestException(e);
+ return FutureUtil.failedFuture(new RestException(e));
}
}
return
FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
@@ -4792,7 +4793,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(
Integer maxConsumersPerSubscriptionToSet, boolean isGlobal) {
if (maxConsumersPerSubscriptionToSet != null &&
maxConsumersPerSubscriptionToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for maxConsumersPerSubscription");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Invalid value for maxConsumersPerSubscription"));
}
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
isGlobal, false, policies -> {
policies.setMaxConsumersPerSubscription(maxConsumersPerSubscriptionToSet);
@@ -4821,7 +4823,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetCompactionThreshold(Long
compactionThresholdToSet, boolean isGlobal) {
if (compactionThresholdToSet != null && compactionThresholdToSet < 0) {
- throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for compactionThreshold");
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Invalid value for compactionThreshold"));
}
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
isGlobal, false, policies -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index a1693b1553d..776f99b120c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -23,7 +23,6 @@ import static
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTrack
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
@@ -158,7 +157,9 @@ abstract class Bucket {
}
private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long
bucketId) {
- Objects.requireNonNull(bucketId);
+ if (bucketId == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
bucketId should not be null"));
+ }
return sequencer.sequential(() -> {
return executeWithRetry(() -> cursor.putCursorProperty(bucketKey,
String.valueOf(bucketId)),
ManagedLedgerException.BadVersionException.class,
MaxRetryTimes);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 0e7f15e64db..6ea75d11de0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -671,7 +671,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private CompletableFuture<Void> publishOverrideEventAsync(String
serviceUnit,
ServiceUnitStateData override) {
if (!validateChannelState(Started, true)) {
- throw new IllegalStateException("Invalid channel state:" +
channelState.name());
+ return FutureUtil.failedFuture(new IllegalStateException("Invalid
channel state:" + channelState.name()));
}
EventType eventType = EventType.Override;
eventCounters.get(eventType).getTotal().incrementAndGet();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 71d686f9132..8fe0cd627c4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1145,8 +1145,12 @@ public class NamespaceService implements AutoCloseable {
*/
public CompletableFuture<Void>
updateNamespaceBundlesForPolicies(NamespaceName nsname,
NamespaceBundles nsBundles) {
- Objects.requireNonNull(nsname);
- Objects.requireNonNull(nsBundles);
+ if (nsname == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
NamespaceName should not be null"));
+ }
+ if (nsBundles == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
NamespaceBundles should not be null"));
+ }
return
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies
-> {
if (policies.isPresent()) {
@@ -1172,8 +1176,12 @@ public class NamespaceService implements AutoCloseable {
* @param nsBundles the new namespace bundles
*/
public CompletableFuture<Void> updateNamespaceBundles(NamespaceName
nsname, NamespaceBundles nsBundles) {
- Objects.requireNonNull(nsname);
- Objects.requireNonNull(nsBundles);
+ if (nsname == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
NamespaceName should not be null"));
+ }
+ if (nsBundles == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
NamespaceBundles should not be null"));
+ }
LocalPolicies localPolicies = nsBundles.toLocalPolicies();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a6164541144..f4df62879e5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service;
-import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
@@ -2011,7 +2010,9 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@NonNull TopicName topicName) {
- requireNonNull(topicName);
+ if (topicName == null) {
+ return FutureUtil.failedFuture(new
NullPointerException("topicName"));
+ }
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
@@ -3433,10 +3434,15 @@ public class BrokerService implements Closeable {
Optional<Policies> policies) {
final int defaultNumPartitions =
pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- checkArgument(defaultNumPartitions > 0,
- "Default number of partitions should be more than 0");
- checkArgument(maxPartitions <= 0 || defaultNumPartitions <=
maxPartitions,
- "Number of partitions should be less than or equal to " +
maxPartitions);
+ if (defaultNumPartitions <= 0) {
+ return FutureUtil.failedFuture(
+ new IllegalArgumentException("Default number of partitions
should be more than 0"));
+ }
+ if (maxPartitions > 0 && defaultNumPartitions > maxPartitions) {
+ return FutureUtil.failedFuture(
+ new IllegalArgumentException("Number of partitions should
be less than or equal to "
+ + maxPartitions));
+ }
PartitionedTopicMetadata configMetadata = new
PartitionedTopicMetadata(defaultNumPartitions);
@@ -3751,7 +3757,9 @@ public class BrokerService implements Closeable {
}
public @NonNull CompletableFuture<Boolean>
isAllowAutoSubscriptionCreationAsync(@NonNull TopicName tpName) {
- requireNonNull(tpName);
+ if (tpName == null) {
+ return FutureUtil.failedFuture(new NullPointerException("tpName"));
+ }
// Policies priority: topic level -> namespace level -> broker level
if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) {
return CompletableFuture.completedFuture(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index c3d88b9c723..ad18af308e2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -565,7 +565,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@VisibleForTesting
@NonNull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@NonNull
NamespaceName namespace) {
- requireNonNull(namespace);
+ if (namespace == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
NamespaceName should not be null"));
+ }
if (closed.get()) {
return CompletableFuture.completedFuture(false);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 8f66d9c0e3e..fd3ffb1a34c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
@@ -291,7 +292,8 @@ public interface Topic {
* Get the last message position that can be dispatch.
*/
default CompletableFuture<Position> getLastDispatchablePosition() {
- throw new UnsupportedOperationException("getLastDispatchablePosition
is not supported by default");
+ return FutureUtil.failedFuture(
+ new UnsupportedOperationException("getLastDispatchablePosition
is not supported by default"));
}
CompletableFuture<MessageId> getLastMessageId();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 96a9f97d70f..2b14df499ae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1218,7 +1218,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
@Override
public CompletableFuture<MessageId> getLastMessageId() {
- throw new UnsupportedOperationException("getLastMessageId is not
supported on non-persistent topic");
+ return FutureUtil.failedFuture(
+ new UnsupportedOperationException("getLastMessageId is not
supported on non-persistent topic"));
}
private static final Logger log =
LoggerFactory.getLogger(NonPersistentTopic.class);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 97d5a7bc953..2bb6372c1cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -60,6 +60,7 @@ import
org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
@@ -139,7 +140,7 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
public synchronized CompletableFuture<ByteBuf>
getCompressedBuffer(Executor executor) {
if (released) {
- throw new IllegalStateException("Already released!");
+ return FutureUtil.failedFuture(new
IllegalStateException("Already released!"));
}
if (compressedBuffer == null) {
compressedBuffer = new CompletableFuture<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 88ca099b4ca..20521780bff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
/**
* Pulsar system topic.
@@ -123,7 +124,7 @@ public interface SystemTopicClient<T> {
* @return message id future
*/
default CompletableFuture<MessageId> deleteAsync(String key, T t) {
- throw new UnsupportedOperationException("Unsupported operation");
+ return FutureUtil.failedFuture(new
UnsupportedOperationException("Unsupported operation"));
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 65489eaa34b..9291097d271 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -213,7 +213,11 @@ public abstract class PulsarWebResource {
isClientAuthenticated(appId), appId);
}
String originalPrincipal = originalPrincipal();
- validateOriginalPrincipal(appId, originalPrincipal);
+ try {
+ validateOriginalPrincipal(appId, originalPrincipal);
+ } catch (RestException e) {
+ return FutureUtil.failedFuture(e);
+ }
if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
BrokerService brokerService = pulsar.getBrokerService();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index ff0fa0bcd12..b44480378c6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -19,13 +19,13 @@
package org.apache.pulsar.compaction;
import com.google.common.annotations.VisibleForTesting;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.util.FutureUtil;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
@@ -61,14 +61,18 @@ public class PulsarCompactionServiceFactory implements
CompactionServiceFactory
@Override
public CompletableFuture<Void> initialize(@NonNull PulsarService
pulsarService) {
- Objects.requireNonNull(pulsarService);
+ if (pulsarService == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
pulsarService should not be null"));
+ }
this.pulsarService = pulsarService;
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<TopicCompactionService>
newTopicCompactionService(@NonNull String topic) {
- Objects.requireNonNull(topic);
+ if (topic == null) {
+ return FutureUtil.failedFuture(new NullPointerException("Expected
topic should not be null"));
+ }
PulsarTopicCompactionService pulsarTopicCompactionService =
new PulsarTopicCompactionService(topic,
pulsarService.getBookKeeperClient(), () -> {
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index 1b54092d9aa..3c674975742 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public class StrategicTwoPhaseCompactor extends
PublishingOrderCompactor {
}
public CompletableFuture<Long> compact(String topic) {
- throw new UnsupportedOperationException();
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 0d3f8a4c619..84198967601 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -27,7 +27,6 @@ import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -40,6 +39,7 @@ import
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.FrameDecoderUtil;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -106,10 +106,14 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
* @return a {@link CompletableFuture} that completes when the TLS is set
up.
*/
CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
- Objects.requireNonNull(ch, "A channel is required");
- Objects.requireNonNull(sniHost, "A sniHost is required");
+ if (ch == null) {
+ return FutureUtil.failedFuture(new NullPointerException("A channel
is required"));
+ }
+ if (sniHost == null) {
+ return FutureUtil.failedFuture(new NullPointerException("A sniHost
is required"));
+ }
if (!tlsEnabled) {
- throw new IllegalStateException("TLS is not enabled in client
configuration");
+ return FutureUtil.failedFuture(new IllegalStateException("TLS is
not enabled in client configuration"));
}
CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
ch.eventLoop().execute(() -> {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index ae784a1b18d..1b43d17d9d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -228,10 +227,13 @@ public class FutureUtil {
}
/**
- * @throws NullPointerException NPE when param is null
+ * @return a {@link CompletableFuture} representing the newly
scheduled task,
+ * or one completed exceptionally with {@link NullPointerException} if
param is null.
*/
public synchronized CompletableFuture<T>
sequential(Supplier<CompletableFuture<T>> newTask) {
- Objects.requireNonNull(newTask);
+ if (newTask == null) {
+ return failedFuture(new NullPointerException("Expected
Supplier should not be null"));
+ }
if (sequencerFuture.isDone()) {
if (sequencerFuture.isCompletedExceptionally() &&
allowExceptionBreakChain) {
return sequencerFuture;
@@ -282,13 +284,18 @@ public class FutureUtil {
}
/**
- * @throws RejectedExecutionException if this task cannot be accepted for
execution
- * @throws NullPointerException if one of params is null
+ * @return a {@link CompletableFuture} representing the asynchronous
composition.
+ * The returned future is completed exceptionally with {@link
NullPointerException} if one of params is null,
+ * or with {@link RejectedExecutionException} if the task cannot be
accepted for execution.
*/
public static <T> @NonNull CompletableFuture<T>
composeAsync(Supplier<CompletableFuture<T>> futureSupplier,
Executor
executor) {
- Objects.requireNonNull(futureSupplier);
- Objects.requireNonNull(executor);
+ if (futureSupplier == null) {
+ return failedFuture(new NullPointerException("Expected Supplier
should not be null"));
+ }
+ if (executor == null) {
+ return failedFuture(new NullPointerException("Expected Executor
should not be null"));
+ }
final CompletableFuture<T> future = new CompletableFuture<>();
try {
executor.execute(() -> futureSupplier.get().whenComplete((result,
error) -> {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
index 294c0db99ea..481d47a21c2 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.common.util.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.util.FutureUtil;
/**
* Static utility methods for operating on {@link ChannelFuture}s.
@@ -41,7 +41,9 @@ public class ChannelFutures {
* and completes exceptionally if the channelFuture completes with
a {@link Throwable}
*/
public static CompletableFuture<Channel> toCompletableFuture(ChannelFuture
channelFuture) {
- Objects.requireNonNull(channelFuture, "channelFuture cannot be null");
+ if (channelFuture == null) {
+ return FutureUtil.failedFuture(new
NullPointerException("channelFuture cannot be null"));
+ }
CompletableFuture<Channel> adapter = new CompletableFuture<>();
if (channelFuture.isDone()) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
index 8686a381679..109fa2fa265 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.common.util.netty;
import io.netty.util.concurrent.Future;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.util.FutureUtil;
/**
* Contains utility methods for working with Netty Futures.
@@ -34,7 +34,9 @@ public class NettyFutureUtil {
* @return converted future instance
*/
public static <V> CompletableFuture<V> toCompletableFuture(Future<V>
future) {
- Objects.requireNonNull(future, "future cannot be null");
+ if (future == null) {
+ return FutureUtil.failedFuture(new NullPointerException("future
cannot be null"));
+ }
CompletableFuture<V> adapter = new CompletableFuture<>();
if (future.isDone()) {
@@ -62,7 +64,9 @@ public class NettyFutureUtil {
* @return converted future instance
*/
public static CompletableFuture<Void> toCompletableFutureVoid(Future<?>
future) {
- Objects.requireNonNull(future, "future cannot be null");
+ if (future == null) {
+ return FutureUtil.failedFuture(new NullPointerException("future
cannot be null"));
+ }
CompletableFuture<Void> adapter = new CompletableFuture<>();
if (future.isDone()) {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
index 31c0eb4af2b..65b9e7b6339 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
@@ -23,6 +23,7 @@ import io.netty.channel.Channel;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.DefaultEventLoop;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.mockito.Mock;
@@ -65,9 +66,16 @@ public class ChannelFuturesTest {
channelFuture = new DefaultChannelPromise(channel);
}
- @Test(expectedExceptions = NullPointerException.class)
+ @Test
public void toCompletableFuture_shouldRequireNonNullArgument() {
- ChannelFutures.toCompletableFuture(null);
+ CompletableFuture<Channel> future =
ChannelFutures.toCompletableFuture(null);
+ Assert.assertTrue(future.isCompletedExceptionally());
+ try {
+ future.join();
+ Assert.fail("Expected NullPointerException");
+ } catch (CompletionException e) {
+ Assert.assertTrue(e.getCause() instanceof NullPointerException);
+ }
}
@Test