This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bbf56f651eb [fix][client]Prevent ZeroQueueConsumer from receiving 
batch messages when using MessagePayloadProcessor (#24610)
bbf56f651eb is described below

commit bbf56f651ebe357745b46654a1d9fb7dc4899fd7
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Aug 12 15:24:06 2025 +0800

    [fix][client]Prevent ZeroQueueConsumer from receiving batch messages when 
using MessagePayloadProcessor (#24610)
---
 .../pulsar/client/impl/ZeroQueueSizeTest.java      | 68 ++++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     | 25 +++++---
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 34 ++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  | 36 +++++++++---
 5 files changed, 145 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index a9aee661a16..b9f48974e3f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -38,7 +38,11 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayload;
+import org.apache.pulsar.client.api.MessagePayloadContext;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -578,4 +582,68 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
             assertEquals(receivedMessages.get(i).intValue(), i);
         }
     }
+
+    @Test(timeOut = 30000)
+    public void 
testZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessage() throws 
Exception {
+        String key = "payloadProcessorReceiveBatchMessage";
+
+        // 1. Config
+        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+
+        // 2. Create Producer
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batchingMaxMessages(5)
+                .enableBatching(true)
+                .create();
+
+        // 3. Create Consumer
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
+                .topic(topicName)
+                .messagePayloadProcessor(new MessagePayloadProcessor() {
+                                             @Override
+                                             public <T> void 
process(MessagePayload payload,
+                                                             
MessagePayloadContext context, Schema<T> schema,
+                                                             
java.util.function.Consumer<Message<T>> messageConsumer) {
+                                                 if (context.isBatch()) {
+                                                     final int numMessages = 
context.getNumMessages();
+                                                     for (int i = 0; i < 
numMessages; i++) {
+                                                         messageConsumer.
+                                                                 
accept(context.getMessageAt(i, numMessages,
+                                                                         
payload, true, schema)
+                                                                 );
+                                                     }
+                                                 } else {
+                                                     
messageConsumer.accept(context.asSingleMessage(payload, schema));
+                                                 }
+                                             }
+                                         }
+                )
+                .subscriptionName(subscriptionName)
+                .receiverQueueSize(0)
+                .subscribe();
+
+        ArrayList<CompletableFuture<MessageId>> futures = new ArrayList<>();
+        // 3. producer publish batch-messages
+        for (int i = 0; i < totalMessages; i++) {
+            String message = messagePredicate + i;
+            futures.add(producer.sendAsync(message.getBytes()));
+        }
+        producer.flush();
+
+        // ensure all messages are sent as batch messages
+        for (CompletableFuture<MessageId> future : futures) {
+            assertTrue(future.get() instanceof BatchMessageIdImpl);
+        }
+
+        // 4. consumer should throw PulsarClientException when call method 
receive()
+        assertThatThrownBy(
+                consumer::receive
+        )
+                .isInstanceOf(PulsarClientException.class)
+                .hasMessage("java.lang.InterruptedException: Queue is 
terminated")
+                .hasCauseInstanceOf(InterruptedException.class);
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index beba2988e67..339a05d1b53 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -99,17 +99,25 @@ public interface Consumer<T> extends Closeable, 
MessageAcknowledger {
      */
     CompletableFuture<Void> unsubscribeAsync(boolean force);
     /**
-     * Receives a single message.
+     * Receives a single message in blocking mode.
      *
-     * <p>This calls blocks until a message is available.
+     * <p>This method blocks until a message is available or the consumer is 
closed.
      *
-     * <p>When thread is Interrupted, return a null value and reset 
interrupted flag.
+     * <p>Behavior when interrupted:
+     * <ul>
+     *   <li>If the thread is interrupted while waiting: returns null and 
resets the interrupted flag</li>
+     *   <li>If the consumer is closed while waiting: throws {@link 
PulsarClientException} with the cause
+     *       {@code InterruptedException("Queue is terminated")}</li>
+     * </ul>
      *
-     * @return the received message
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws PulsarClientException.InvalidConfigurationException
-     *             if a message listener was defined in the configuration
+     * @return the received message, or null if the thread was interrupted
+     * @throws PulsarClientException if the consumer is closed while waiting 
for a message.
+     *         The exception will contain an {@link InterruptedException} with 
the message
+     *         "Queue is terminated" as its cause.
+     * @throws PulsarClientException.AlreadyClosedException if the consumer 
was already closed
+     *         before this method was called
+     * @throws PulsarClientException.InvalidConfigurationException if a 
message listener
+     *         was defined in the configuration
      */
     Message<T> receive() throws PulsarClientException;
 
@@ -135,6 +143,7 @@ public interface Consumer<T> extends Closeable, 
MessageAcknowledger {
      * Receive a single message.
      *
      * <p>Retrieves a message, waiting up to the specified wait time if 
necessary.
+     * <p>If consumer closes during wait: returns null immediately.
      *
      * @param timeout
      *            0 or less means immediate rather than infinite
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 142c4741149..70904c0897a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -254,7 +254,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * but no more than 256ms. If set to k, the redelivery time will be 
bucketed by 2^k ms.
      * If the value is 0, the redelivery time will be accurate to ms.
      *
-     * @param negativeAckPrecisionBitCnt
+     * @param negativeAckPrecisionBitCount
      *            The redelivery time precision bit count.
      * @return the consumer builder instance
      */
@@ -393,8 +393,9 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * size is zero. {@link Consumer#receive()} function call should not be 
interrupted when the consumer queue size is
      * zero.</li>
      * <li>Doesn't support Batch-Message. If a consumer receives a 
batch-message, it closes the consumer connection with
-     * the broker and {@link Consumer#receive()} calls remain blocked while 
{@link Consumer#receiveAsync()} receives
-     * exception in callback.
+     * the broker and {@link Consumer#receive()} calls will throw {@link 
PulsarClientException}
+     * while {@link Consumer#receiveAsync()} receives
+     * {@link PulsarClientException} in callback.
      *
      * <b> The consumer is not able to receive any further messages unless 
batch-message in pipeline
      * is removed.</b></li>
@@ -822,6 +823,33 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * If configured with a non-null value, the consumer uses the processor to 
process the payload, including
      * decoding it to messages and triggering the listener.
      *
+     * <p><b>Special behavior when {@link #receiverQueueSize(int) 
receiverQueueSize=0}:</b>
+     * When the consumer is configured with {@link #receiverQueueSize(int) 
receiverQueueSize=0}:
+     * <ul>
+     *   <li>For <b>batch messages</b>:
+     *     <ul>
+     *       <li>The payload processor will <i>not</i> be invoked</li>
+     *       <li>The consumer will <b>immediately close itself</b> upon 
receiving batch messages</li>
+     *       <li>Pending operations will fail with:
+     *         <ul>
+     *           <li>{@code receive()}: throws {@link 
PulsarClientException}</li>
+     *           <li>{@code receiveAsync()}: completes the Future with {@link 
PulsarClientException}</li>
+     *           <li>Message listeners: triggers {@link Consumer#close()} 
without delivering the message</li>
+     *         </ul>
+     *       </li>
+     *     </ul>
+     *   </li>
+     *   <li>For <b>single messages</b>:
+     *     <ul>
+     *       <li>The payload processor will process messages normally</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     *
+     *
+     * <p><b>Default behavior {@link #receiverQueueSize(int) 
receiverQueueSize>0}:</b>
+     * All messages (both single and batched) will be processed by the payload 
processor.
+     *
      * Default: null
      */
     ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor 
payloadProcessor);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b9fe9fba810..15b3f56a8c9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1382,7 +1382,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         });
     }
 
-    private void processPayloadByProcessor(final BrokerEntryMetadata 
brokerEntryMetadata,
+    protected void processPayloadByProcessor(final BrokerEntryMetadata 
brokerEntryMetadata,
                                            final MessageMetadata 
messageMetadata,
                                            final ByteBuf byteBuf,
                                            final MessageIdImpl messageId,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index a15e0459a79..01ff4d57782 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -196,9 +196,35 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
                                             int redeliveryCount, List<Long> 
ackSet, ByteBuf uncompressedPayload,
                                             MessageIdData messageId, ClientCnx 
cnx, long consumerEpoch,
                                             boolean isEncrypted) {
+
+        rejectBatchMessageByClosingConsumer(
+                new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex())
+        );
+    }
+
+    @Override
+    protected void setCurrentReceiverQueueSize(int newSize) {
+        //receiver queue size is fixed as 0.
+        throw new NotImplementedException("Receiver queue size can't be 
changed in ZeroQueueConsumerImpl");
+    }
+
+    @Override
+    protected void processPayloadByProcessor(BrokerEntryMetadata 
brokerEntryMetadata,
+                                             MessageMetadata messageMetadata, 
ByteBuf byteBuf,
+                                             MessageIdImpl messageId, 
Schema<T> schema,
+                                             int redeliveryCount, List<Long> 
ackSet, long consumerEpoch) {
+        if (this.isBatch(messageMetadata)) {
+            rejectBatchMessageByClosingConsumer(messageId);
+        } else {
+            super.processPayloadByProcessor(brokerEntryMetadata, 
messageMetadata, byteBuf, messageId, schema,
+                    redeliveryCount, ackSet, consumerEpoch);
+        }
+    }
+
+    private void rejectBatchMessageByClosingConsumer(MessageIdImpl messageId) {
         log.warn(
-                "Closing consumer [{}]-[{}] due to unsupported received 
batch-message with zero receiver queue size",
-                subscription, consumerName);
+            "Closing consumer [{}]-[{}] due to unsupported received 
batch-message: {} with zero receiver queue size",
+            subscription, consumerName, messageId);
         // close connection
         closeAsync().handle((ok, e) -> {
             // notify callback with failure result
@@ -209,10 +235,4 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
             return null;
         });
     }
-
-    @Override
-    protected void setCurrentReceiverQueueSize(int newSize) {
-        //receiver queue size is fixed as 0.
-        throw new NotImplementedException("Receiver queue size can't be 
changed in ZeroQueueConsumerImpl");
-    }
 }

Reply via email to