This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch consumer_aware_queue_change
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/consumer_aware_queue_change by 
this push:
     new 974c765882 add unit test for 
DefaultMQPushConsumer#setMessageQueueListener
974c765882 is described below

commit 974c7658828fb70028de6ecdd2966b3ad638a2ed
Author: Li Zhanhui <lizhan...@gmail.com>
AuthorDate: Fri Nov 10 15:42:23 2023 +0800

    add unit test for DefaultMQPushConsumer#setMessageQueueListener
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../consumer/balance/NormalMsgDynamicBalanceIT.java  | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
index b2c9b06589..9bcd2a5ce2 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -17,6 +17,11 @@
 
 package org.apache.rocketmq.test.client.consumer.balance;
 
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.test.base.BaseConf;
@@ -112,4 +117,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf {
                 consumer2.getListener().getAllUndupMsgBody()).size());
         assertThat(balance).isEqualTo(true);
     }
+
+    @Test
+    public void testMessageQueueListener() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", 
new RMQNormalListener());
+        // Register message queue listener
+        consumer1.getConsumer().setMessageQueueListener((topic, mqAll, 
mqAssigned) -> latch.countDown());
+
+        // Without message queue listener
+        RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListener());
+
+        Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+    }
 }

Reply via email to