This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new db85c8ce4bb [fix][client] NPE in 
MultiTopicsConsumerImpl.negativeAcknowledge (#24476)
db85c8ce4bb is described below

commit db85c8ce4bb36eb86fa9219a51b3b530569ee178
Author: Penghui Li <[email protected]>
AuthorDate: Wed Jul 2 18:58:39 2025 -0700

    [fix][client] NPE in MultiTopicsConsumerImpl.negativeAcknowledge (#24476)
---
 .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 04dfe5cc950..727a71a9b8d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -571,6 +571,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void negativeAcknowledge(MessageId messageId) {
+        if (getState() != State.Ready) {
+            log.warn("[{}] [{}] Cannot negative acknowledge message {} - 
consumer is not ready (state: {})",
+                    topic, subscription, messageId, getState());
+            return;
+        }
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(messageId);
@@ -579,6 +584,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void negativeAcknowledge(Message<?> message) {
+        if (getState() != State.Ready) {
+            log.warn("[{}] [{}] Cannot negative acknowledge message {} - 
consumer is not ready (state: {})",
+                    topic, subscription, message.getMessageId(), getState());
+            return;
+        }
         MessageId messageId = message.getMessageId();
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());

Reply via email to