This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch consumer_aware_queue_change in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/consumer_aware_queue_change by this push: new 974c765882 add unit test for DefaultMQPushConsumer#setMessageQueueListener 974c765882 is described below commit 974c7658828fb70028de6ecdd2966b3ad638a2ed Author: Li Zhanhui <lizhan...@gmail.com> AuthorDate: Fri Nov 10 15:42:23 2023 +0800 add unit test for DefaultMQPushConsumer#setMessageQueueListener Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- .../consumer/balance/NormalMsgDynamicBalanceIT.java | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java index b2c9b06589..9bcd2a5ce2 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java @@ -17,6 +17,11 @@ package org.apache.rocketmq.test.client.consumer.balance; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.test.base.BaseConf; @@ -112,4 +117,19 @@ public class NormalMsgDynamicBalanceIT extends BaseConf { consumer2.getListener().getAllUndupMsgBody()).size()); assertThat(balance).isEqualTo(true); } + + @Test + public void testMessageQueueListener() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener()); + // Register message queue listener + consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown()); + + // Without message queue listener + RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListener()); + + Assert.assertTrue(latch.await(30, TimeUnit.SECONDS)); + } }