This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bbf56f651eb [fix][client]Prevent ZeroQueueConsumer from receiving
batch messages when using MessagePayloadProcessor (#24610)
bbf56f651eb is described below
commit bbf56f651ebe357745b46654a1d9fb7dc4899fd7
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Aug 12 15:24:06 2025 +0800
[fix][client]Prevent ZeroQueueConsumer from receiving batch messages when
using MessagePayloadProcessor (#24610)
---
.../pulsar/client/impl/ZeroQueueSizeTest.java | 68 ++++++++++++++++++++++
.../org/apache/pulsar/client/api/Consumer.java | 25 +++++---
.../apache/pulsar/client/api/ConsumerBuilder.java | 34 ++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 36 +++++++++---
5 files changed, 145 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index a9aee661a16..b9f48974e3f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -38,7 +38,11 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -578,4 +582,68 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
assertEquals(receivedMessages.get(i).intValue(), i);
}
}
+
+ @Test(timeOut = 30000)
+ public void
testZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessage() throws
Exception {
+ String key = "payloadProcessorReceiveBatchMessage";
+
+ // 1. Config
+ final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final String messagePredicate = "my-message-" + key + "-";
+
+ // 2. Create Producer
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxMessages(5)
+ .enableBatching(true)
+ .create();
+
+ // 3. Create Consumer
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
+ .topic(topicName)
+ .messagePayloadProcessor(new MessagePayloadProcessor() {
+ @Override
+ public <T> void
process(MessagePayload payload,
+
MessagePayloadContext context, Schema<T> schema,
+
java.util.function.Consumer<Message<T>> messageConsumer) {
+ if (context.isBatch()) {
+ final int numMessages =
context.getNumMessages();
+ for (int i = 0; i <
numMessages; i++) {
+ messageConsumer.
+
accept(context.getMessageAt(i, numMessages,
+
payload, true, schema)
+ );
+ }
+ } else {
+
messageConsumer.accept(context.asSingleMessage(payload, schema));
+ }
+ }
+ }
+ )
+ .subscriptionName(subscriptionName)
+ .receiverQueueSize(0)
+ .subscribe();
+
+ ArrayList<CompletableFuture<MessageId>> futures = new ArrayList<>();
+ // 3. producer publish batch-messages
+ for (int i = 0; i < totalMessages; i++) {
+ String message = messagePredicate + i;
+ futures.add(producer.sendAsync(message.getBytes()));
+ }
+ producer.flush();
+
+ // ensure all messages are sent as batch messages
+ for (CompletableFuture<MessageId> future : futures) {
+ assertTrue(future.get() instanceof BatchMessageIdImpl);
+ }
+
+ // 4. consumer should throw PulsarClientException when call method
receive()
+ assertThatThrownBy(
+ consumer::receive
+ )
+ .isInstanceOf(PulsarClientException.class)
+ .hasMessage("java.lang.InterruptedException: Queue is
terminated")
+ .hasCauseInstanceOf(InterruptedException.class);
+ producer.close();
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index beba2988e67..339a05d1b53 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -99,17 +99,25 @@ public interface Consumer<T> extends Closeable,
MessageAcknowledger {
*/
CompletableFuture<Void> unsubscribeAsync(boolean force);
/**
- * Receives a single message.
+ * Receives a single message in blocking mode.
*
- * <p>This calls blocks until a message is available.
+ * <p>This method blocks until a message is available or the consumer is
closed.
*
- * <p>When thread is Interrupted, return a null value and reset
interrupted flag.
+ * <p>Behavior when interrupted:
+ * <ul>
+ * <li>If the thread is interrupted while waiting: returns null and
resets the interrupted flag</li>
+ * <li>If the consumer is closed while waiting: throws {@link
PulsarClientException} with the cause
+ * {@code InterruptedException("Queue is terminated")}</li>
+ * </ul>
*
- * @return the received message
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws PulsarClientException.InvalidConfigurationException
- * if a message listener was defined in the configuration
+ * @return the received message, or null if the thread was interrupted
+ * @throws PulsarClientException if the consumer is closed while waiting
for a message.
+ * The exception will contain an {@link InterruptedException} with
the message
+ * "Queue is terminated" as its cause.
+ * @throws PulsarClientException.AlreadyClosedException if the consumer
was already closed
+ * before this method was called
+ * @throws PulsarClientException.InvalidConfigurationException if a
message listener
+ * was defined in the configuration
*/
Message<T> receive() throws PulsarClientException;
@@ -135,6 +143,7 @@ public interface Consumer<T> extends Closeable,
MessageAcknowledger {
* Receive a single message.
*
* <p>Retrieves a message, waiting up to the specified wait time if
necessary.
+ * <p>If consumer closes during wait: returns null immediately.
*
* @param timeout
* 0 or less means immediate rather than infinite
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 142c4741149..70904c0897a 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -254,7 +254,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* but no more than 256ms. If set to k, the redelivery time will be
bucketed by 2^k ms.
* If the value is 0, the redelivery time will be accurate to ms.
*
- * @param negativeAckPrecisionBitCnt
+ * @param negativeAckPrecisionBitCount
* The redelivery time precision bit count.
* @return the consumer builder instance
*/
@@ -393,8 +393,9 @@ public interface ConsumerBuilder<T> extends Cloneable {
* size is zero. {@link Consumer#receive()} function call should not be
interrupted when the consumer queue size is
* zero.</li>
* <li>Doesn't support Batch-Message. If a consumer receives a
batch-message, it closes the consumer connection with
- * the broker and {@link Consumer#receive()} calls remain blocked while
{@link Consumer#receiveAsync()} receives
- * exception in callback.
+ * the broker and {@link Consumer#receive()} calls will throw {@link
PulsarClientException}
+ * while {@link Consumer#receiveAsync()} receives
+ * {@link PulsarClientException} in callback.
*
* <b> The consumer is not able to receive any further messages unless
batch-message in pipeline
* is removed.</b></li>
@@ -822,6 +823,33 @@ public interface ConsumerBuilder<T> extends Cloneable {
* If configured with a non-null value, the consumer uses the processor to
process the payload, including
* decoding it to messages and triggering the listener.
*
+ * <p><b>Special behavior when {@link #receiverQueueSize(int)
receiverQueueSize=0}:</b>
+ * When the consumer is configured with {@link #receiverQueueSize(int)
receiverQueueSize=0}:
+ * <ul>
+ * <li>For <b>batch messages</b>:
+ * <ul>
+ * <li>The payload processor will <i>not</i> be invoked</li>
+ * <li>The consumer will <b>immediately close itself</b> upon
receiving batch messages</li>
+ * <li>Pending operations will fail with:
+ * <ul>
+ * <li>{@code receive()}: throws {@link
PulsarClientException}</li>
+ * <li>{@code receiveAsync()}: completes the Future with {@link
PulsarClientException}</li>
+ * <li>Message listeners: triggers {@link Consumer#close()}
without delivering the message</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * <li>For <b>single messages</b>:
+ * <ul>
+ * <li>The payload processor will process messages normally</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ *
+ * <p><b>Default behavior {@link #receiverQueueSize(int)
receiverQueueSize>0}:</b>
+ * All messages (both single and batched) will be processed by the payload
processor.
+ *
* Default: null
*/
ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor
payloadProcessor);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b9fe9fba810..15b3f56a8c9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1382,7 +1382,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
});
}
- private void processPayloadByProcessor(final BrokerEntryMetadata
brokerEntryMetadata,
+ protected void processPayloadByProcessor(final BrokerEntryMetadata
brokerEntryMetadata,
final MessageMetadata
messageMetadata,
final ByteBuf byteBuf,
final MessageIdImpl messageId,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index a15e0459a79..01ff4d57782 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -196,9 +196,35 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
int redeliveryCount, List<Long>
ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx
cnx, long consumerEpoch,
boolean isEncrypted) {
+
+ rejectBatchMessageByClosingConsumer(
+ new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex())
+ );
+ }
+
+ @Override
+ protected void setCurrentReceiverQueueSize(int newSize) {
+ //receiver queue size is fixed as 0.
+ throw new NotImplementedException("Receiver queue size can't be
changed in ZeroQueueConsumerImpl");
+ }
+
+ @Override
+ protected void processPayloadByProcessor(BrokerEntryMetadata
brokerEntryMetadata,
+ MessageMetadata messageMetadata,
ByteBuf byteBuf,
+ MessageIdImpl messageId,
Schema<T> schema,
+ int redeliveryCount, List<Long>
ackSet, long consumerEpoch) {
+ if (this.isBatch(messageMetadata)) {
+ rejectBatchMessageByClosingConsumer(messageId);
+ } else {
+ super.processPayloadByProcessor(brokerEntryMetadata,
messageMetadata, byteBuf, messageId, schema,
+ redeliveryCount, ackSet, consumerEpoch);
+ }
+ }
+
+ private void rejectBatchMessageByClosingConsumer(MessageIdImpl messageId) {
log.warn(
- "Closing consumer [{}]-[{}] due to unsupported received
batch-message with zero receiver queue size",
- subscription, consumerName);
+ "Closing consumer [{}]-[{}] due to unsupported received
batch-message: {} with zero receiver queue size",
+ subscription, consumerName, messageId);
// close connection
closeAsync().handle((ok, e) -> {
// notify callback with failure result
@@ -209,10 +235,4 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
return null;
});
}
-
- @Override
- protected void setCurrentReceiverQueueSize(int newSize) {
- //receiver queue size is fixed as 0.
- throw new NotImplementedException("Receiver queue size can't be
changed in ZeroQueueConsumerImpl");
- }
}