This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b26934b2ff24ab1d152dad01da349d3f8e5ae5e0 Author: Ruimin MA <[email protected]> AuthorDate: Tue Aug 12 15:24:06 2025 +0800 [fix][client]Prevent ZeroQueueConsumer from receiving batch messages when using MessagePayloadProcessor (#24610) (cherry picked from commit bbf56f651ebe357745b46654a1d9fb7dc4899fd7) --- .../pulsar/client/impl/ZeroQueueSizeTest.java | 68 ++++++++++++++++++++++ .../org/apache/pulsar/client/api/Consumer.java | 25 +++++--- .../apache/pulsar/client/api/ConsumerBuilder.java | 34 ++++++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 36 +++++++++--- 5 files changed, 145 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index a9aee661a16..b9f48974e3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -38,7 +38,11 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.MessagePayloadContext; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -578,4 +582,68 @@ public class ZeroQueueSizeTest extends BrokerTestBase { assertEquals(receivedMessages.get(i).intValue(), i); } } + + @Test(timeOut = 30000) + public void testZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessage() throws Exception { + String key = "payloadProcessorReceiveBatchMessage"; + + // 1. Config + final String topicName = "persistent://prop/use/ns-abc/topic-" + key; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + + // 2. Create Producer + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .batchingMaxMessages(5) + .enableBatching(true) + .create(); + + // 3. Create Consumer + ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() + .topic(topicName) + .messagePayloadProcessor(new MessagePayloadProcessor() { + @Override + public <T> void process(MessagePayload payload, + MessagePayloadContext context, Schema<T> schema, + java.util.function.Consumer<Message<T>> messageConsumer) { + if (context.isBatch()) { + final int numMessages = context.getNumMessages(); + for (int i = 0; i < numMessages; i++) { + messageConsumer. + accept(context.getMessageAt(i, numMessages, + payload, true, schema) + ); + } + } else { + messageConsumer.accept(context.asSingleMessage(payload, schema)); + } + } + } + ) + .subscriptionName(subscriptionName) + .receiverQueueSize(0) + .subscribe(); + + ArrayList<CompletableFuture<MessageId>> futures = new ArrayList<>(); + // 3. producer publish batch-messages + for (int i = 0; i < totalMessages; i++) { + String message = messagePredicate + i; + futures.add(producer.sendAsync(message.getBytes())); + } + producer.flush(); + + // ensure all messages are sent as batch messages + for (CompletableFuture<MessageId> future : futures) { + assertTrue(future.get() instanceof BatchMessageIdImpl); + } + + // 4. consumer should throw PulsarClientException when call method receive() + assertThatThrownBy( + consumer::receive + ) + .isInstanceOf(PulsarClientException.class) + .hasMessage("java.lang.InterruptedException: Queue is terminated") + .hasCauseInstanceOf(InterruptedException.class); + producer.close(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index beba2988e67..339a05d1b53 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -99,17 +99,25 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger { */ CompletableFuture<Void> unsubscribeAsync(boolean force); /** - * Receives a single message. + * Receives a single message in blocking mode. * - * <p>This calls blocks until a message is available. + * <p>This method blocks until a message is available or the consumer is closed. * - * <p>When thread is Interrupted, return a null value and reset interrupted flag. + * <p>Behavior when interrupted: + * <ul> + * <li>If the thread is interrupted while waiting: returns null and resets the interrupted flag</li> + * <li>If the consumer is closed while waiting: throws {@link PulsarClientException} with the cause + * {@code InterruptedException("Queue is terminated")}</li> + * </ul> * - * @return the received message - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws PulsarClientException.InvalidConfigurationException - * if a message listener was defined in the configuration + * @return the received message, or null if the thread was interrupted + * @throws PulsarClientException if the consumer is closed while waiting for a message. + * The exception will contain an {@link InterruptedException} with the message + * "Queue is terminated" as its cause. + * @throws PulsarClientException.AlreadyClosedException if the consumer was already closed + * before this method was called + * @throws PulsarClientException.InvalidConfigurationException if a message listener + * was defined in the configuration */ Message<T> receive() throws PulsarClientException; @@ -135,6 +143,7 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger { * Receive a single message. * * <p>Retrieves a message, waiting up to the specified wait time if necessary. + * <p>If consumer closes during wait: returns null immediately. * * @param timeout * 0 or less means immediate rather than infinite diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 8eaf5ca969f..fc7848838d0 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -254,7 +254,7 @@ public interface ConsumerBuilder<T> extends Cloneable { * but no more than 256ms. If set to k, the redelivery time will be bucketed by 2^k ms. * If the value is 0, the redelivery time will be accurate to ms. * - * @param negativeAckPrecisionBitCnt + * @param negativeAckPrecisionBitCount * The redelivery time precision bit count. * @return the consumer builder instance */ @@ -393,8 +393,9 @@ public interface ConsumerBuilder<T> extends Cloneable { * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is * zero.</li> * <li>Doesn't support Batch-Message. If a consumer receives a batch-message, it closes the consumer connection with - * the broker and {@link Consumer#receive()} calls remain blocked while {@link Consumer#receiveAsync()} receives - * exception in callback. + * the broker and {@link Consumer#receive()} calls will throw {@link PulsarClientException} + * while {@link Consumer#receiveAsync()} receives + * {@link PulsarClientException} in callback. * * <b> The consumer is not able to receive any further messages unless batch-message in pipeline * is removed.</b></li> @@ -821,6 +822,33 @@ public interface ConsumerBuilder<T> extends Cloneable { * If configured with a non-null value, the consumer uses the processor to process the payload, including * decoding it to messages and triggering the listener. * + * <p><b>Special behavior when {@link #receiverQueueSize(int) receiverQueueSize=0}:</b> + * When the consumer is configured with {@link #receiverQueueSize(int) receiverQueueSize=0}: + * <ul> + * <li>For <b>batch messages</b>: + * <ul> + * <li>The payload processor will <i>not</i> be invoked</li> + * <li>The consumer will <b>immediately close itself</b> upon receiving batch messages</li> + * <li>Pending operations will fail with: + * <ul> + * <li>{@code receive()}: throws {@link PulsarClientException}</li> + * <li>{@code receiveAsync()}: completes the Future with {@link PulsarClientException}</li> + * <li>Message listeners: triggers {@link Consumer#close()} without delivering the message</li> + * </ul> + * </li> + * </ul> + * </li> + * <li>For <b>single messages</b>: + * <ul> + * <li>The payload processor will process messages normally</li> + * </ul> + * </li> + * </ul> + * + * + * <p><b>Default behavior {@link #receiverQueueSize(int) receiverQueueSize>0}:</b> + * All messages (both single and batched) will be processed by the payload processor. + * * Default: null */ ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a1f8a7753b5..bee9eb68d06 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1367,7 +1367,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } - private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata, + protected void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata, final MessageMetadata messageMetadata, final ByteBuf byteBuf, final MessageIdImpl messageId, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index e0011ae610c..cba7af69f8b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -195,9 +195,35 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx, long consumerEpoch) { + + rejectBatchMessageByClosingConsumer( + new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()) + ); + } + + @Override + protected void setCurrentReceiverQueueSize(int newSize) { + //receiver queue size is fixed as 0. + throw new NotImplementedException("Receiver queue size can't be changed in ZeroQueueConsumerImpl"); + } + + @Override + protected void processPayloadByProcessor(BrokerEntryMetadata brokerEntryMetadata, + MessageMetadata messageMetadata, ByteBuf byteBuf, + MessageIdImpl messageId, Schema<T> schema, + int redeliveryCount, List<Long> ackSet, long consumerEpoch) { + if (this.isBatch(messageMetadata)) { + rejectBatchMessageByClosingConsumer(messageId); + } else { + super.processPayloadByProcessor(brokerEntryMetadata, messageMetadata, byteBuf, messageId, schema, + redeliveryCount, ackSet, consumerEpoch); + } + } + + private void rejectBatchMessageByClosingConsumer(MessageIdImpl messageId) { log.warn( - "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", - subscription, consumerName); + "Closing consumer [{}]-[{}] due to unsupported received batch-message: {} with zero receiver queue size", + subscription, consumerName, messageId); // close connection closeAsync().handle((ok, e) -> { // notify callback with failure result @@ -208,10 +234,4 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { return null; }); } - - @Override - protected void setCurrentReceiverQueueSize(int newSize) { - //receiver queue size is fixed as 0. - throw new NotImplementedException("Receiver queue size can't be changed in ZeroQueueConsumerImpl"); - } }
