This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new bda0ae0 [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725) bda0ae0 is described below commit bda0ae0305863cc5fa2e0578439dfa4f4d9908b3 Author: 彭小漪 <644120...@qq.com> AuthorDate: Fri Jan 21 15:30:01 2022 +0800 [ISSUE #3724]: Polish the unit test of class ConsumeMessageConcurrentlyService. (#3725) --- .../ConsumeMessageConcurrentlyServiceTest.java | 25 +++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index 5d69aa2..7badc3b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -19,12 +19,16 @@ package org.apache.rocketmq.client.impl.consumer; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -36,6 +40,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.stat.ConsumerStatsManager; import org.apache.rocketmq.common.message.MessageClientExt; @@ -45,10 +50,10 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsItemSet; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -81,6 +86,13 @@ public class ConsumeMessageConcurrentlyServiceTest { @Before public void init() throws Exception { + ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + Collection<MQClientInstance> instances = factoryTable.values(); + for (MQClientInstance instance : instances) { + instance.shutdown(); + } + factoryTable.clear(); + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); @@ -100,12 +112,15 @@ public class ConsumeMessageConcurrentlyServiceTest { field.setAccessible(true); field.set(pushConsumerImpl, rebalancePushImpl); pushConsumer.subscribe(topic, "*"); - pushConsumer.start(); - mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); + // suppress updateTopicRouteInfoFromNameServer + pushConsumer.changeInstanceNameToPID(); + mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)); + mQClientFactory = spy(mQClientFactory); field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(pushConsumerImpl, mQClientFactory); + factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); @@ -117,7 +132,6 @@ public class ConsumeMessageConcurrentlyServiceTest { field.set(pushConsumerImpl, pullAPIWrapper); pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); - mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) @@ -140,12 +154,13 @@ public class ConsumeMessageConcurrentlyServiceTest { }); doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); + pushConsumer.start(); } - @Ignore @Test public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception { final CountDownLatch countDownLatch = new CountDownLatch(1);