This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35dae97c0bf [fix][broker] Return failed future instead of throwing 
exception in async methods (#25289)
35dae97c0bf is described below

commit 35dae97c0bf5e3810d92ba12922e98a85d9634ec
Author: Hao Zhang <[email protected]>
AuthorDate: Tue Mar 10 01:04:10 2026 +0800

    [fix][broker] Return failed future instead of throwing exception in async 
methods (#25289)
    
    Co-authored-by: 张浩 <[email protected]>
---
 .../authorization/PulsarAuthorizationProvider.java |  2 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      |  4 +--
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  5 +--
 .../broker/admin/impl/PersistentTopicsBase.java    | 37 ++++++++++++----------
 .../pulsar/broker/delayed/bucket/Bucket.java       |  5 +--
 .../channel/ServiceUnitStateChannelImpl.java       |  2 +-
 .../pulsar/broker/namespace/NamespaceService.java  | 16 +++++++---
 .../pulsar/broker/service/BrokerService.java       | 22 +++++++++----
 .../SystemTopicBasedTopicPoliciesService.java      |  4 ++-
 .../org/apache/pulsar/broker/service/Topic.java    |  4 ++-
 .../service/nonpersistent/NonPersistentTopic.java  |  3 +-
 .../prometheus/PrometheusMetricsGenerator.java     |  3 +-
 .../pulsar/broker/systopic/SystemTopicClient.java  |  3 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  6 +++-
 .../compaction/PulsarCompactionServiceFactory.java | 10 ++++--
 .../compaction/StrategicTwoPhaseCompactor.java     |  2 +-
 .../client/impl/PulsarChannelInitializer.java      | 12 ++++---
 .../org/apache/pulsar/common/util/FutureUtil.java  | 21 ++++++++----
 .../pulsar/common/util/netty/ChannelFutures.java   |  6 ++--
 .../pulsar/common/util/netty/NettyFutureUtil.java  | 10 ++++--
 .../common/util/netty/ChannelFuturesTest.java      | 12 +++++--
 21 files changed, 125 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 976e7b7ee12..e4f2ac7e8cd 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -326,7 +326,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
     private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
         boolean sameNamespace = namespaces.distinct().count() == 1;
         if (!sameNamespace) {
-            throw new IllegalArgumentException("The namespace should be the 
same");
+            return FutureUtil.failedFuture(new IllegalArgumentException("The 
namespace should be the same"));
         }
         return CompletableFuture.completedFuture(null);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 1e4e4ff66dd..3ee2a1285d3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -420,7 +420,8 @@ public class BrokersBase extends AdminResource {
 
     private CompletableFuture<Void> 
internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
         if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Can't delete 
non-dynamic configuration");
+            return FutureUtil.failedFuture(
+                    new RestException(Status.PRECONDITION_FAILED, "Can't 
delete non-dynamic configuration"));
         } else {
             return 
dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
                 if (old != null) {
@@ -536,4 +537,3 @@ public class BrokersBase extends AdminResource {
         return CompletableFuture.completedFuture(null);
     }
 }
-
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0c18d49ebea..dc173ddac40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -748,7 +748,8 @@ public abstract class NamespacesBase extends AdminResource {
     private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
         boolean sameNamespace = namespaces.distinct().count() == 1;
         if (!sameNamespace) {
-            throw new RestException(Status.BAD_REQUEST, "The namespace should 
be the same");
+            return FutureUtil.failedFuture(
+                    new RestException(Status.BAD_REQUEST, "The namespace 
should be the same"));
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -1971,7 +1972,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                     + " Repl clusters: %s, allowed clusters: %s",
                     ns.toString(), policies.replication_clusters, 
policies.allowed_clusters);
             log.info(msg);
-            throw new RestException(Status.BAD_REQUEST, msg);
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, msg));
         }
         
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, 
policies);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 76b3247c599..be98f7102b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1074,8 +1074,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> 
internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNumToSet,
                                                                                
   boolean isGlobal) {
         if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "maxUnackedNum must be 0 or more");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "maxUnackedNum must be 0 or more"));
         }
 
         return pulsar().getTopicPoliciesService()
@@ -1099,8 +1099,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> 
internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNumToSet,
                                                                               
boolean isGlobal) {
         if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "maxUnackedNum must be 0 or more");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "maxUnackedNum must be 0 or more"));
         }
 
         return pulsar().getTopicPoliciesService()
@@ -1112,7 +1112,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> 
internalSetDeduplicationSnapshotInterval(Integer intervalToSet,
                                                                                
boolean isGlobal) {
         if (intervalToSet != null && intervalToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED, "interval must 
be 0 or more");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "interval must be 0 or more"));
         }
         return pulsar().getTopicPoliciesService()
                 .updateTopicPoliciesAsync(topicName, isGlobal, intervalToSet 
== null, policies -> {
@@ -3643,9 +3644,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> internalSetMaxMessageSize(Integer 
maxMessageSizeToSet, boolean isGlobal) {
         if (maxMessageSizeToSet != null && (maxMessageSizeToSet < 0
                 || maxMessageSizeToSet > config().getMaxMessageSize())) {
-            throw new RestException(Status.PRECONDITION_FAILED
-                    , "topic-level maxMessageSize must be greater than or 
equal to 0 "
-                    + "and must be smaller than that in the broker-level");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "topic-level maxMessageSize must be greater than or equal 
to 0 "
+                            + "and must be smaller than that in the 
broker-level"));
         }
 
         return pulsar().getTopicPoliciesService()
@@ -3673,8 +3674,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Void> internalSetMaxProducers(Integer 
maxProducersToSet, boolean isGlobal) {
         if (maxProducersToSet != null && maxProducersToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "maxProducers must be 0 or more");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "maxProducers must be 0 or more"));
         }
         return pulsar().getTopicPoliciesService()
                 .updateTopicPoliciesAsync(topicName, isGlobal, 
maxProducersToSet == null, policies -> {
@@ -3690,8 +3691,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> 
internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsToSet,
                                                                           
boolean isGlobal) {
         if (maxSubscriptionsToSet != null && maxSubscriptionsToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "maxSubscriptionsPerTopic must be 0 or more");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "maxSubscriptionsPerTopic must be 0 or more"));
         }
 
         return pulsar().getTopicPoliciesService()
@@ -3776,8 +3777,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Void> internalSetMaxConsumers(Integer 
maxConsumersToSet, boolean isGlobal) {
         if (maxConsumersToSet != null && maxConsumersToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "maxConsumers must be 0 or more");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "maxConsumers must be 0 or more"));
         }
         return pulsar().getTopicPoliciesService()
                 .updateTopicPoliciesAsync(topicName, isGlobal, 
maxConsumersToSet == null, policies -> {
@@ -4678,7 +4679,7 @@ public class PersistentTopicsBase extends AdminResource {
                 
futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString()));
             } catch (Exception e) {
                 log.error("[{}] Failed to trim topic {}", clientAppId(), 
topicNamePartition, e);
-                throw new RestException(e);
+                return FutureUtil.failedFuture(new RestException(e));
             }
         }
         return 
FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
@@ -4792,7 +4793,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(
             Integer maxConsumersPerSubscriptionToSet, boolean isGlobal) {
         if (maxConsumersPerSubscriptionToSet != null && 
maxConsumersPerSubscriptionToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for maxConsumersPerSubscription");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Invalid value for maxConsumersPerSubscription"));
         }
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, false, policies -> {
             
policies.setMaxConsumersPerSubscription(maxConsumersPerSubscriptionToSet);
@@ -4821,7 +4823,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected CompletableFuture<Void> internalSetCompactionThreshold(Long 
compactionThresholdToSet, boolean isGlobal) {
         if (compactionThresholdToSet != null && compactionThresholdToSet < 0) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value 
for compactionThreshold");
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "Invalid value for compactionThreshold"));
         }
 
         return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
isGlobal, false, policies -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index a1693b1553d..776f99b120c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -23,7 +23,6 @@ import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTrack
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.AllArgsConstructor;
@@ -158,7 +157,9 @@ abstract class Bucket {
     }
 
     private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long 
bucketId) {
-        Objects.requireNonNull(bucketId);
+        if (bucketId == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
bucketId should not be null"));
+        }
         return sequencer.sequential(() -> {
             return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, 
String.valueOf(bucketId)),
                     ManagedLedgerException.BadVersionException.class, 
MaxRetryTimes);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 0e7f15e64db..6ea75d11de0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -671,7 +671,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private CompletableFuture<Void> publishOverrideEventAsync(String 
serviceUnit,
                                                               
ServiceUnitStateData override) {
         if (!validateChannelState(Started, true)) {
-            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+            return FutureUtil.failedFuture(new IllegalStateException("Invalid 
channel state:" + channelState.name()));
         }
         EventType eventType = EventType.Override;
         eventCounters.get(eventType).getTotal().incrementAndGet();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 71d686f9132..8fe0cd627c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1145,8 +1145,12 @@ public class NamespaceService implements AutoCloseable {
      */
     public CompletableFuture<Void> 
updateNamespaceBundlesForPolicies(NamespaceName nsname,
                                                                       
NamespaceBundles nsBundles) {
-        Objects.requireNonNull(nsname);
-        Objects.requireNonNull(nsBundles);
+        if (nsname == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
NamespaceName should not be null"));
+        }
+        if (nsBundles == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
NamespaceBundles should not be null"));
+        }
 
         return 
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies
 -> {
             if (policies.isPresent()) {
@@ -1172,8 +1176,12 @@ public class NamespaceService implements AutoCloseable {
      * @param nsBundles the new namespace bundles
      */
     public CompletableFuture<Void> updateNamespaceBundles(NamespaceName 
nsname, NamespaceBundles nsBundles) {
-        Objects.requireNonNull(nsname);
-        Objects.requireNonNull(nsBundles);
+        if (nsname == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
NamespaceName should not be null"));
+        }
+        if (nsBundles == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
NamespaceBundles should not be null"));
+        }
 
         LocalPolicies localPolicies = nsBundles.toLocalPolicies();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a6164541144..f4df62879e5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
@@ -2011,7 +2010,9 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(@NonNull TopicName topicName) {
-        requireNonNull(topicName);
+        if (topicName == null) {
+            return FutureUtil.failedFuture(new 
NullPointerException("topicName"));
+        }
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
@@ -3433,10 +3434,15 @@ public class BrokerService implements Closeable {
                                                                                
         Optional<Policies> policies) {
         final int defaultNumPartitions = 
pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        checkArgument(defaultNumPartitions > 0,
-                "Default number of partitions should be more than 0");
-        checkArgument(maxPartitions <= 0 || defaultNumPartitions <= 
maxPartitions,
-                "Number of partitions should be less than or equal to " + 
maxPartitions);
+        if (defaultNumPartitions <= 0) {
+            return FutureUtil.failedFuture(
+                    new IllegalArgumentException("Default number of partitions 
should be more than 0"));
+        }
+        if (maxPartitions > 0 && defaultNumPartitions > maxPartitions) {
+            return FutureUtil.failedFuture(
+                    new IllegalArgumentException("Number of partitions should 
be less than or equal to "
+                            + maxPartitions));
+        }
 
         PartitionedTopicMetadata configMetadata = new 
PartitionedTopicMetadata(defaultNumPartitions);
 
@@ -3751,7 +3757,9 @@ public class BrokerService implements Closeable {
     }
 
     public @NonNull CompletableFuture<Boolean> 
isAllowAutoSubscriptionCreationAsync(@NonNull TopicName tpName) {
-        requireNonNull(tpName);
+        if (tpName == null) {
+            return FutureUtil.failedFuture(new NullPointerException("tpName"));
+        }
         // Policies priority: topic level -> namespace level -> broker level
         if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) {
             return CompletableFuture.completedFuture(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index c3d88b9c723..ad18af308e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -565,7 +565,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @VisibleForTesting
     @NonNull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@NonNull 
NamespaceName namespace) {
-        requireNonNull(namespace);
+        if (namespace == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
NamespaceName should not be null"));
+        }
         if (closed.get()) {
             return CompletableFuture.completedFuture(false);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 8f66d9c0e3e..fd3ffb1a34c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 
@@ -291,7 +292,8 @@ public interface Topic {
      * Get the last message position that can be dispatch.
      */
     default CompletableFuture<Position> getLastDispatchablePosition() {
-        throw new UnsupportedOperationException("getLastDispatchablePosition 
is not supported by default");
+        return FutureUtil.failedFuture(
+                new UnsupportedOperationException("getLastDispatchablePosition 
is not supported by default"));
     }
 
     CompletableFuture<MessageId> getLastMessageId();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 96a9f97d70f..2b14df499ae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1218,7 +1218,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
     @Override
     public CompletableFuture<MessageId> getLastMessageId() {
-        throw new UnsupportedOperationException("getLastMessageId is not 
supported on non-persistent topic");
+        return FutureUtil.failedFuture(
+                new UnsupportedOperationException("getLastMessageId is not 
supported on non-persistent topic"));
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 97d5a7bc953..2bb6372c1cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -60,6 +60,7 @@ import 
org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
@@ -139,7 +140,7 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
 
         public synchronized CompletableFuture<ByteBuf> 
getCompressedBuffer(Executor executor) {
             if (released) {
-                throw new IllegalStateException("Already released!");
+                return FutureUtil.failedFuture(new 
IllegalStateException("Already released!"));
             }
             if (compressedBuffer == null) {
                 compressedBuffer = new CompletableFuture<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 88ca099b4ca..20521780bff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 
 /**
  * Pulsar system topic.
@@ -123,7 +124,7 @@ public interface SystemTopicClient<T> {
          * @return message id future
          */
         default CompletableFuture<MessageId> deleteAsync(String key, T t) {
-            throw new UnsupportedOperationException("Unsupported operation");
+            return FutureUtil.failedFuture(new 
UnsupportedOperationException("Unsupported operation"));
         }
 
         /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 65489eaa34b..9291097d271 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -213,7 +213,11 @@ public abstract class PulsarWebResource {
                     isClientAuthenticated(appId), appId);
         }
         String originalPrincipal = originalPrincipal();
-        validateOriginalPrincipal(appId, originalPrincipal);
+        try {
+            validateOriginalPrincipal(appId, originalPrincipal);
+        } catch (RestException e) {
+            return FutureUtil.failedFuture(e);
+        }
 
         if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
             BrokerService brokerService = pulsar.getBrokerService();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
index ff0fa0bcd12..b44480378c6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java
@@ -19,13 +19,13 @@
 package org.apache.pulsar.compaction;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.jspecify.annotations.NonNull;
 import org.jspecify.annotations.Nullable;
 
@@ -61,14 +61,18 @@ public class PulsarCompactionServiceFactory implements 
CompactionServiceFactory
 
     @Override
     public CompletableFuture<Void> initialize(@NonNull PulsarService 
pulsarService) {
-        Objects.requireNonNull(pulsarService);
+        if (pulsarService == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
pulsarService should not be null"));
+        }
         this.pulsarService = pulsarService;
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<TopicCompactionService> 
newTopicCompactionService(@NonNull String topic) {
-        Objects.requireNonNull(topic);
+        if (topic == null) {
+            return FutureUtil.failedFuture(new NullPointerException("Expected 
topic should not be null"));
+        }
         PulsarTopicCompactionService pulsarTopicCompactionService =
                 new PulsarTopicCompactionService(topic, 
pulsarService.getBookKeeperClient(), () -> {
                     try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index 1b54092d9aa..3c674975742 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public class StrategicTwoPhaseCompactor extends 
PublishingOrderCompactor {
     }
 
     public CompletableFuture<Long> compact(String topic) {
-        throw new UnsupportedOperationException();
+        return FutureUtil.failedFuture(new UnsupportedOperationException());
     }
 
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 0d3f8a4c619..84198967601 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -27,7 +27,6 @@ import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.handler.ssl.SslHandler;
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -40,6 +39,7 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.FrameDecoderUtil;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.PulsarSslConfiguration;
 import org.apache.pulsar.common.util.PulsarSslFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
@@ -106,10 +106,14 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
      * @return a {@link CompletableFuture} that completes when the TLS is set 
up.
      */
     CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
-        Objects.requireNonNull(ch, "A channel is required");
-        Objects.requireNonNull(sniHost, "A sniHost is required");
+        if (ch == null) {
+            return FutureUtil.failedFuture(new NullPointerException("A channel 
is required"));
+        }
+        if (sniHost == null) {
+            return FutureUtil.failedFuture(new NullPointerException("A sniHost 
is required"));
+        }
         if (!tlsEnabled) {
-            throw new IllegalStateException("TLS is not enabled in client 
configuration");
+            return FutureUtil.failedFuture(new IllegalStateException("TLS is 
not enabled in client configuration"));
         }
         CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
         ch.eventLoop().execute(() -> {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index ae784a1b18d..1b43d17d9d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -228,10 +227,13 @@ public class FutureUtil {
         }
 
         /**
-         * @throws NullPointerException NPE when param is null
+         * @return a {@link CompletableFuture} representing the newly 
scheduled task,
+         * or one completed exceptionally with {@link NullPointerException} if 
param is null.
          */
         public synchronized CompletableFuture<T> 
sequential(Supplier<CompletableFuture<T>> newTask) {
-            Objects.requireNonNull(newTask);
+            if (newTask == null) {
+                return failedFuture(new NullPointerException("Expected 
Supplier should not be null"));
+            }
             if (sequencerFuture.isDone()) {
                 if (sequencerFuture.isCompletedExceptionally() && 
allowExceptionBreakChain) {
                     return sequencerFuture;
@@ -282,13 +284,18 @@ public class FutureUtil {
     }
 
     /**
-     * @throws RejectedExecutionException if this task cannot be accepted for 
execution
-     * @throws NullPointerException if one of params is null
+     * @return a {@link CompletableFuture} representing the asynchronous 
composition.
+     * The returned future is completed exceptionally with {@link 
NullPointerException} if one of params is null,
+     * or with {@link RejectedExecutionException} if the task cannot be 
accepted for execution.
      */
     public static <T> @NonNull CompletableFuture<T> 
composeAsync(Supplier<CompletableFuture<T>> futureSupplier,
                                                                  Executor 
executor) {
-        Objects.requireNonNull(futureSupplier);
-        Objects.requireNonNull(executor);
+        if (futureSupplier == null) {
+            return failedFuture(new NullPointerException("Expected Supplier 
should not be null"));
+        }
+        if (executor == null) {
+            return failedFuture(new NullPointerException("Expected Executor 
should not be null"));
+        }
         final CompletableFuture<T> future = new CompletableFuture<>();
         try {
             executor.execute(() -> futureSupplier.get().whenComplete((result, 
error) -> {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
index 294c0db99ea..481d47a21c2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.common.util.netty;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.util.FutureUtil;
 
 /**
  * Static utility methods for operating on {@link ChannelFuture}s.
@@ -41,7 +41,9 @@ public class ChannelFutures {
      *         and completes exceptionally if the channelFuture completes with 
a {@link Throwable}
      */
     public static CompletableFuture<Channel> toCompletableFuture(ChannelFuture 
channelFuture) {
-        Objects.requireNonNull(channelFuture, "channelFuture cannot be null");
+        if (channelFuture == null) {
+            return FutureUtil.failedFuture(new 
NullPointerException("channelFuture cannot be null"));
+        }
 
         CompletableFuture<Channel> adapter = new CompletableFuture<>();
         if (channelFuture.isDone()) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
index 8686a381679..109fa2fa265 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.common.util.netty;
 
 import io.netty.util.concurrent.Future;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.util.FutureUtil;
 
 /**
  * Contains utility methods for working with Netty Futures.
@@ -34,7 +34,9 @@ public class NettyFutureUtil {
      * @return converted future instance
      */
     public static <V> CompletableFuture<V> toCompletableFuture(Future<V> 
future) {
-        Objects.requireNonNull(future, "future cannot be null");
+        if (future == null) {
+            return FutureUtil.failedFuture(new NullPointerException("future 
cannot be null"));
+        }
 
         CompletableFuture<V> adapter = new CompletableFuture<>();
         if (future.isDone()) {
@@ -62,7 +64,9 @@ public class NettyFutureUtil {
      * @return converted future instance
      */
     public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> 
future) {
-        Objects.requireNonNull(future, "future cannot be null");
+        if (future == null) {
+            return FutureUtil.failedFuture(new NullPointerException("future 
cannot be null"));
+        }
 
         CompletableFuture<Void> adapter = new CompletableFuture<>();
         if (future.isDone()) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
index 31c0eb4af2b..65b9e7b6339 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java
@@ -23,6 +23,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.DefaultChannelPromise;
 import io.netty.channel.DefaultEventLoop;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.mockito.Mock;
@@ -65,9 +66,16 @@ public class ChannelFuturesTest {
         channelFuture = new DefaultChannelPromise(channel);
     }
 
-    @Test(expectedExceptions = NullPointerException.class)
+    @Test
     public void toCompletableFuture_shouldRequireNonNullArgument() {
-        ChannelFutures.toCompletableFuture(null);
+        CompletableFuture<Channel> future = 
ChannelFutures.toCompletableFuture(null);
+        Assert.assertTrue(future.isCompletedExceptionally());
+        try {
+            future.join();
+            Assert.fail("Expected NullPointerException");
+        } catch (CompletionException e) {
+            Assert.assertTrue(e.getCause() instanceof NullPointerException);
+        }
     }
 
     @Test


Reply via email to