This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 15d32db03b [ISSUE #7547] Let consumer be aware of message queue 
assignment change (#7548)
15d32db03b is described below

commit 15d32db03b20b130473271b182c9ba32cc8048cb
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Mon Nov 13 09:44:25 2023 +0800

    [ISSUE #7547] Let consumer be aware of message queue assignment change 
(#7548)
    
    * let consumer be aware of message queue assignment change
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * add unit test for DefaultMQPushConsumer#setMessageQueueListener
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: bazel build warnings
    
    Signed-off-by: Zhanhui Li <lizhan...@apache.org>
    
    * fix: set MixCommitlogTest test size as medium
    
    Signed-off-by: Zhanhui Li <lizhan...@apache.org>
    
    * allow cache bazel test results
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix code style issue by removing unused imports
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix #7552
    
    Signed-off-by: Zhanhui Li <lizhan...@apache.org>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    Signed-off-by: Zhanhui Li <lizhan...@apache.org>
---
 .github/workflows/bazel.yml                             |  2 +-
 .../rocketmq/client/consumer/DefaultMQPushConsumer.java | 13 +++++++++++++
 .../org/apache/rocketmq/client/consumer/MQConsumer.java |  8 +++++---
 .../rocketmq/client/consumer/MessageQueueListener.java  |  5 ++---
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 10 +++++++++-
 .../client/impl/consumer/RebalancePushImpl.java         |  8 +++++++-
 .../proxy/service/message/LocalRemotingCommand.java     |  1 +
 remoting/BUILD.bazel                                    |  1 +
 store/BUILD.bazel                                       |  1 +
 .../consumer/balance/NormalMsgDynamicBalanceIT.java     | 17 +++++++++++++++++
 tieredstore/BUILD.bazel                                 |  1 +
 tools/BUILD.bazel                                       |  1 +
 12 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/.github/workflows/bazel.yml b/.github/workflows/bazel.yml
index 7652b93048..af674592bb 100644
--- a/.github/workflows/bazel.yml
+++ b/.github/workflows/bazel.yml
@@ -19,4 +19,4 @@ jobs:
       - name: Build
         run: bazel build --config=remote //...
       - name: Run Tests
-        run: bazel test --config=remote --nocache_test_results //...
\ No newline at end of file
+        run: bazel test --config=remote //...
\ No newline at end of file
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 1afb9113eb..e593a17c98 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     private MessageListener messageListener;
 
+    /**
+     * Listener to call if message queue assignment is changed.
+     */
+    private MessageQueueListener messageQueueListener;
+
     /**
      * Offset Storage
      */
@@ -987,4 +992,12 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     public void setClientRebalance(boolean clientRebalance) {
         this.clientRebalance = clientRebalance;
     }
+
+    public MessageQueueListener getMessageQueueListener() {
+        return messageQueueListener;
+    }
+
+    public void setMessageQueueListener(MessageQueueListener 
messageQueueListener) {
+        this.messageQueueListener = messageQueueListener;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index f4a8eda23a..81e06ee417 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -29,20 +29,22 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
  */
 public interface MQConsumer extends MQAdmin {
     /**
-     * If consuming failure,message will be send back to the brokers,and delay 
consuming some time
+     * If consuming of messages failed, they will be sent back to the brokers 
for another delivery attempt after
+     * interval specified in delay level.
      */
     @Deprecated
     void sendMessageBack(final MessageExt msg, final int delayLevel) throws 
RemotingException,
         MQBrokerException, InterruptedException, MQClientException;
 
     /**
-     * If consuming failure,message will be send back to the broker,and delay 
consuming some time
+     * If consuming of messages failed, they will be sent back to the brokers 
for another delivery attempt after
+     * interval specified in delay level.
      */
     void sendMessageBack(final MessageExt msg, final int delayLevel, final 
String brokerName)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
 
     /**
-     * Fetch message queues from consumer cache according to the topic
+     * Fetch message queues from consumer cache pertaining to the given topic.
      *
      * @param topic message topic
      * @return queue set
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
index 63795a6eeb..74510f4c3e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -26,8 +26,7 @@ public interface MessageQueueListener {
     /**
      * @param topic message topic
      * @param mqAll all queues in this message topic
-     * @param mqDivided collection of queues,assigned to the current consumer
+     * @param mqAssigned collection of queues, assigned to the current consumer
      */
-    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
-        final Set<MessageQueue> mqDivided);
+    void messageQueueChanged(final String topic, final Set<MessageQueue> 
mqAll, final Set<MessageQueue> mqAssigned);
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index e57579321c..cfb89b5c88 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckCallback;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PopCallback;
 import org.apache.rocketmq.client.consumer.PopResult;
@@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     private long queueMaxSpanFlowControlTimes = 0;
 
     //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
-    private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 
360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
+    private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 
300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
 
     private static final int MAX_POP_INVISIBLE_TIME = 300000;
     private static final int MIN_POP_INVISIBLE_TIME = 5000;
@@ -1553,4 +1554,11 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     int[] getPopDelayLevel() {
         return popDelayLevel;
     }
+
+    public MessageQueueListener getMessageQueueListener() {
+        if (null == defaultMQPushConsumer) {
+            return null;
+        }
+        return defaultMQPushConsumer.getMessageQueueListener();
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index df509f3716..f9cf429c69 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,7 +53,7 @@ public class RebalancePushImpl extends RebalanceImpl {
 
     @Override
     public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
-        /**
+        /*
          * When rebalance result changed, should update subscription's version 
to notify broker.
          * Fix: inconsistency subscription may lead to consumer miss messages.
          */
@@ -82,6 +83,11 @@ public class RebalancePushImpl extends RebalanceImpl {
 
         // notify broker
         this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);
+
+        MessageQueueListener messageQueueListener = 
defaultMQPushConsumerImpl.getMessageQueueListener();
+        if (null != messageQueueListener) {
+            messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+        }
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
index 915cafcd57..7bf4a16982 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
@@ -32,6 +32,7 @@ public class LocalRemotingCommand extends RemotingCommand {
         cmd.writeCustomHeader(customHeader);
         cmd.setExtFields(new HashMap<>());
         setCmdVersion(cmd);
+        cmd.makeCustomHeaderToNet();
         return cmd;
     }
 
diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel
index db8b24301d..072148bc08 100644
--- a/remoting/BUILD.bazel
+++ b/remoting/BUILD.bazel
@@ -65,6 +65,7 @@ java_library(
         "@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
         "@maven//:org_apache_tomcat_annotations_api",
         "@maven//:org_apache_commons_commons_lang3",
+        "@maven//:org_jetbrains_annotations",
     ],
     resources = glob(["src/test/resources/certs/*.pem"]) + 
glob(["src/test/resources/certs/*.key"])
 )
diff --git a/store/BUILD.bazel b/store/BUILD.bazel
index bf594aaa69..4b046c68eb 100644
--- a/store/BUILD.bazel
+++ b/store/BUILD.bazel
@@ -79,6 +79,7 @@ GenTestRules(
         "src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest",
         "src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
         
"src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest",
+        "src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest",
     ],
     test_files = glob(["src/test/java/**/*Test.java"]),
     deps = [
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index b2c9b06589..684b718ae5 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.test.client.consumer.balance;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.test.base.BaseConf;
@@ -112,4 +114,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
                 consumer2.getListener().getAllUndupMsgBody()).size());
         assertThat(balance).isEqualTo(true);
     }
+
+    @Test
+    public void testMessageQueueListener() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", 
new RMQNormalListener());
+        // Register message queue listener
+        consumer1.getConsumer().setMessageQueueListener((topic, mqAll, 
mqAssigned) -> latch.countDown());
+
+        // Without message queue listener
+        RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListener());
+
+        Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+    }
 }
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
index 8b6705ac28..e16fca90d0 100644
--- a/tieredstore/BUILD.bazel
+++ b/tieredstore/BUILD.bazel
@@ -41,6 +41,7 @@ java_library(
         "@maven//:org_apache_tomcat_annotations_api",
         "@maven//:com_alibaba_fastjson",
         "@maven//:org_apache_rocketmq_rocketmq_rocksdb",
+        "@maven//:commons_collections_commons_collections",
     ],
 )
 
diff --git a/tools/BUILD.bazel b/tools/BUILD.bazel
index 9ccc115335..05d88f7b00 100644
--- a/tools/BUILD.bazel
+++ b/tools/BUILD.bazel
@@ -39,6 +39,7 @@ java_library(
         "@maven//:commons_collections_commons_collections",
         "@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
         "@maven//:io_github_aliyunmq_rocketmq_logback_classic",
+        "@maven//:org_apache_rocketmq_rocketmq_rocksdb",
     ],
 )
 

Reply via email to