This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new db85c8ce4bb [fix][client] NPE in
MultiTopicsConsumerImpl.negativeAcknowledge (#24476)
db85c8ce4bb is described below
commit db85c8ce4bb36eb86fa9219a51b3b530569ee178
Author: Penghui Li <[email protected]>
AuthorDate: Wed Jul 2 18:58:39 2025 -0700
[fix][client] NPE in MultiTopicsConsumerImpl.negativeAcknowledge (#24476)
---
.../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 04dfe5cc950..727a71a9b8d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -571,6 +571,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void negativeAcknowledge(MessageId messageId) {
+ if (getState() != State.Ready) {
+ log.warn("[{}] [{}] Cannot negative acknowledge message {} -
consumer is not ready (state: {})",
+ topic, subscription, messageId, getState());
+ return;
+ }
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
consumer.negativeAcknowledge(messageId);
@@ -579,6 +584,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void negativeAcknowledge(Message<?> message) {
+ if (getState() != State.Ready) {
+ log.warn("[{}] [{}] Cannot negative acknowledge message {} -
consumer is not ready (state: {})",
+ topic, subscription, message.getMessageId(), getState());
+ return;
+ }
MessageId messageId = message.getMessageId();
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());