This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch consumer_aware_queue_change
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3dfe73125ac0463dc3a3968a0cfacb0a0020ae9c
Author: Li Zhanhui <lizhan...@gmail.com>
AuthorDate: Fri Nov 10 15:25:58 2023 +0800

    let consumer be aware of message queue assignment change
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../rocketmq/client/consumer/DefaultMQPushConsumer.java     | 13 +++++++++++++
 .../org/apache/rocketmq/client/consumer/MQConsumer.java     |  8 +++++---
 .../rocketmq/client/consumer/MessageQueueListener.java      |  5 ++---
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java     | 10 +++++++++-
 .../rocketmq/client/impl/consumer/RebalancePushImpl.java    |  8 +++++++-
 5 files changed, 36 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 1afb9113eb..e593a17c98 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     private MessageListener messageListener;
 
+    /**
+     * Listener to call if message queue assignment is changed.
+     */
+    private MessageQueueListener messageQueueListener;
+
     /**
      * Offset Storage
      */
@@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     public void setClientRebalance(boolean clientRebalance) {
         this.clientRebalance = clientRebalance;
     }
+
+    public MessageQueueListener getMessageQueueListener() {
+        return messageQueueListener;
+    }
+
+    public void setMessageQueueListener(MessageQueueListener 
messageQueueListener) {
+        this.messageQueueListener = messageQueueListener;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index f4a8eda23a..81e06ee417 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -29,20 +29,22 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
  */
 public interface MQConsumer extends MQAdmin {
     /**
-     * If consuming failure,message will be send back to the brokers,and delay 
consuming some time
+     * If consuming of messages failed, they will be sent back to the brokers 
for another delivery attempt after
+     * interval specified in delay level.
      */
     @Deprecated
     void sendMessageBack(final MessageExt msg, final int delayLevel) throws 
RemotingException,
         MQBrokerException, InterruptedException, MQClientException;
 
     /**
-     * If consuming failure,message will be send back to the broker,and delay 
consuming some time
+     * If consuming of messages failed, they will be sent back to the brokers 
for another delivery attempt after
+     * interval specified in delay level.
      */
     void sendMessageBack(final MessageExt msg, final int delayLevel, final 
String brokerName)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
 
     /**
-     * Fetch message queues from consumer cache according to the topic
+     * Fetch message queues from consumer cache pertaining to the given topic.
      *
      * @param topic message topic
      * @return queue set
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
index 63795a6eeb..74510f4c3e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -26,8 +26,7 @@ public interface MessageQueueListener {
     /**
      * @param topic message topic
      * @param mqAll all queues in this message topic
-     * @param mqDivided collection of queues,assigned to the current consumer
+     * @param mqAssigned collection of queues, assigned to the current consumer
      */
-    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
-        final Set<MessageQueue> mqDivided);
+    void messageQueueChanged(final String topic, final Set<MessageQueue> 
mqAll, final Set<MessageQueue> mqAssigned);
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index e57579321c..cfb89b5c88 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PopCallback;
 import org.apache.rocketmq.client.consumer.PopResult;
@@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     private long queueMaxSpanFlowControlTimes = 0;
 
     //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
-    private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 
360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
+    private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 
300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
 
     private static final int MAX_POP_INVISIBLE_TIME = 300000;
     private static final int MIN_POP_INVISIBLE_TIME = 5000;
@@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     int[] getPopDelayLevel() {
         return popDelayLevel;
     }
+
+    public MessageQueueListener getMessageQueueListener() {
+        if (null == defaultMQPushConsumer) {
+            return null;
+        }
+        return defaultMQPushConsumer.getMessageQueueListener();
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index df509f3716..f9cf429c69 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl {
 
     @Override
     public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
-        /**
+        /*
          * When rebalance result changed, should update subscription's version 
to notify broker.
          * Fix: inconsistency subscription may lead to consumer miss messages.
          */
@@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl {
 
         // notify broker
         this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);
+
+        MessageQueueListener messageQueueListener = 
defaultMQPushConsumerImpl.getMessageQueueListener();
+        if (null != messageQueueListener) {
+            messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+        }
     }
 
     @Override

Reply via email to