This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de5221b2a45 [fix][client] Fix async APIs to return failed futures on
validation errors (#25287)
de5221b2a45 is described below
commit de5221b2a45f17e2de17ec5bdf1ab96c3e4b309e
Author: Hao Zhang <[email protected]>
AuthorDate: Mon Mar 9 19:42:23 2026 +0800
[fix][client] Fix async APIs to return failed futures on validation errors
(#25287)
Co-authored-by: 张浩 <[email protected]>
---
.../pulsar/client/impl/MessageChunkingTest.java | 37 ----------------------
.../apache/pulsar/client/impl/ConsumerBase.java | 11 +++++--
.../client/impl/MultiTopicsConsumerImpl.java | 4 ++-
.../pulsar/client/impl/ProducerBuilderImpl.java | 6 ++--
.../client/impl/MultiTopicsConsumerImplTest.java | 11 +++++++
.../client/impl/ProducerBuilderImplTest.java | 20 ++++++++++++
6 files changed, 46 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 1a122b94416..d8a3698636d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -99,19 +99,6 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
return new Object[][] { { true }, { false } };
}
- @Test
- public void testInvalidConfig() throws Exception {
- final String topicName = "persistent://my-property/my-ns/my-topic1";
- ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topicName);
- // batching and chunking can't be enabled together
- try {
- Producer<byte[]> producer =
producerBuilder.enableChunking(true).enableBatching(true).create();
- fail("producer creation should have fail");
- } catch (IllegalArgumentException ie) {
- // Ok
- }
- }
-
@Test(dataProvider = "ackReceiptEnabledWithMaxMessageSize")
public void testLargeMessage(boolean ackReceiptEnabled, boolean
clientSizeMaxMessageSize) throws Exception {
@@ -448,30 +435,6 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
Assert.assertEquals(((ConsumerImpl<String>)
consumer).getAvailablePermits(), 8);
}
- /**
- * Validate that chunking is not supported with batching and
non-persistent topic.
- *
- * @throws Exception
- */
- @Test
- public void testInvalidUseCaseForChunking() throws Exception {
-
- log.info("-- Starting {} test --", methodName);
- this.conf.setMaxMessageSize(5);
- final String topicName = "persistent://my-property/my-ns/my-topic1";
-
- ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topicName);
-
- try {
- Producer<byte[]> producer =
producerBuilder.enableChunking(true).enableBatching(true).create();
- fail("it should have failied because chunking can't be used with
batching enabled");
- } catch (IllegalArgumentException ie) {
- // Ok
- }
-
- log.info("-- Exiting {} test --", methodName);
- }
-
@Test
public void testExpireIncompleteChunkMessage() throws Exception{
final String topicName = "persistent://prop/use/ns-abc/expireMsg";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1755c94b0de..aed525c9eee 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
@@ -662,7 +661,10 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
TransactionImpl txnImpl = null;
if (null != txn) {
- checkArgument(txn instanceof TransactionImpl);
+ if (!(txn instanceof TransactionImpl)) {
+ return FutureUtil.failedFuture(new IllegalArgumentException(
+ "Expected txn to be an instance of TransactionImpl,
but got " + txn.getClass().getName()));
+ }
txnImpl = (TransactionImpl) txn;
CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
if (!txnImpl.checkIfOpen(completableFuture)) {
@@ -691,7 +693,10 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
TransactionImpl txnImpl = null;
if (null != txn) {
- checkArgument(txn instanceof TransactionImpl);
+ if (!(txn instanceof TransactionImpl)) {
+ return FutureUtil.failedFuture(new IllegalArgumentException(
+ "Expected txn to be an instance of TransactionImpl,
but got " + txn.getClass().getName()));
+ }
txnImpl = (TransactionImpl) txn;
}
return doAcknowledgeWithTxn(messageId, AckType.Cumulative,
Collections.emptyMap(), txnImpl);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8061da4e91c..7865939d77d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1247,7 +1247,9 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// un-subscribe a given topic
public CompletableFuture<Void> unsubscribeAsync(String topicName) {
- checkArgument(TopicName.isValid(topicName), "Invalid topic name:" +
topicName);
+ if (!TopicName.isValid(topicName)) {
+ return FutureUtil.failedFuture(new
IllegalArgumentException("Invalid topic name:" + topicName));
+ }
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index e176cc41bc6..fe79d618c6a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -93,8 +93,10 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
@Override
public CompletableFuture<Producer<T>> createAsync() {
// config validation
- checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
- "Batching and chunking of messages can't be enabled together");
+ if (conf.isBatchingEnabled() && conf.isChunkingEnabled()) {
+ return FutureUtil.failedFuture(
+ new IllegalArgumentException("Batching and chunking of
messages can't be enabled together"));
+ }
if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name
must be set on the producer builder"));
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 02a4d2ebba8..54175613e3b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -39,6 +39,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -223,6 +224,16 @@ public class MultiTopicsConsumerImplTest {
verify(clientMock, times(1)).cleanupConsumer(any());
}
+ @Test
+ public void testUnsubscribeAsyncInvalidTopicNameReturnsFailedFuture() {
+ MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
+ CompletableFuture<Void> future =
consumer.unsubscribeAsync("persistent://public/invalid-topic");
+
+ assertTrue(future.isCompletedExceptionally());
+ CompletionException ex = expectThrows(CompletionException.class,
future::join);
+ assertTrue(ex.getCause() instanceof IllegalArgumentException);
+ }
+
@Test
public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics()
throws Exception {
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index 45e31e00b4c..738614398b1 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -23,9 +23,12 @@ import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
@@ -385,6 +388,23 @@ public class ProducerBuilderImplTest {
producerBuilderImpl.maxPendingMessagesAcrossPartitions(1000);
}
+ @Test
+ public void testCreateAsyncFailsWhenBatchingAndChunkingEnabled() {
+ producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
+ CompletableFuture<Producer<byte[]>> future =
producerBuilderImpl.topic(TOPIC_NAME)
+ .enableBatching(true)
+ .enableChunking(true)
+ .createAsync();
+
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.join();
+ fail("Expected IllegalArgumentException");
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
+
private class CustomMessageRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {