This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new bd61774b1f [ISSUE #8411] Add more test coverage for DefaultMQPushConsumerImpl (#8412) bd61774b1f is described below commit bd61774b1fc19f92c8c3a466420d50899ed61d9b Author: yx9o <yangx_s...@163.com> AuthorDate: Mon Jul 22 10:05:14 2024 +0800 [ISSUE #8411] Add more test coverage for DefaultMQPushConsumerImpl (#8412) * [ISSUE #8411] Add more test coverage for DefaultMQPushConsumerImpl * Update * Update --- .../consumer/DefaultMQPushConsumerImplTest.java | 736 ++++++++++++++++++++- 1 file changed, 719 insertions(+), 17 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java index 879bbc593c..68563c0256 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java @@ -17,17 +17,50 @@ package org.apache.rocketmq.client.impl.consumer; -import java.util.List; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.client.consumer.AckCallback; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PopCallback; +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.FilterMessageContext; import org.apache.rocketmq.client.hook.FilterMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.stat.ConsumerStatsManager; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -36,17 +69,85 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQPushConsumerImplTest { + @Mock private DefaultMQPushConsumer defaultMQPushConsumer; + @Mock + private MQClientInstance mQClientFactory; + + @Mock + private RebalanceImpl rebalanceImpl; + + @Mock + private PullAPIWrapper pullAPIWrapper; + + @Mock + private PullRequest pullRequest; + + @Mock + private PopRequest popRequest; + + @Mock + private ProcessQueue processQueue; + + @Mock + private PopProcessQueue popProcessQueue; + + @Mock + private MQClientAPIImpl mqClientAPIImpl; + + @Mock + private OffsetStore offsetStore; + + private DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + @Rule public ExpectedException thrown = ExpectedException.none(); + private final String defaultKey = "defaultKey"; + + private final String defaultTopic = "defaultTopic"; + + private final String defaultBroker = "defaultBroker"; + + private final String defaultBrokerAddr = "127.0.0.1:10911"; + + private final String defaultGroup = "defaultGroup"; + + private final long defaultTimeout = 3000L; @Test public void checkConfigTest() throws MQClientException { @@ -62,19 +163,14 @@ public class DefaultMQPushConsumerImplTest { consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMax(9); - consumer.registerMessageListener(new MessageListenerConcurrently() { - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS); DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null); defaultMQPushConsumerImpl.start(); } @Test - public void testHook() throws Exception { + public void testHook() { DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null); defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageHook() { @Override @@ -110,14 +206,10 @@ public class DefaultMQPushConsumerImplTest { @Ignore @Test public void testPush() throws Exception { - when(defaultMQPushConsumer.getMessageListener()).thenReturn(new MessageListenerConcurrently() { - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - assertThat(msgs).size().isGreaterThan(0); - assertThat(context).isNotNull(); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } + when(defaultMQPushConsumer.getMessageListener()).thenReturn((MessageListenerConcurrently) (msgs, context) -> { + assertThat(msgs).size().isGreaterThan(0); + assertThat(context).isNotNull(); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null); try { @@ -126,4 +218,614 @@ public class DefaultMQPushConsumerImplTest { defaultMQPushConsumerImpl.shutdown(); } } + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + MQAdminImpl mqAdminImpl = mock(MQAdminImpl.class); + when(mQClientFactory.getMQAdminImpl()).thenReturn(mqAdminImpl); + ConsumerStatsManager consumerStatsManager = mock(ConsumerStatsManager.class); + ConsumeStatus consumeStatus = mock(ConsumeStatus.class); + when(consumerStatsManager.consumeStatus(any(), any())).thenReturn(consumeStatus); + when(mQClientFactory.getConsumerStatsManager()).thenReturn(consumerStatsManager); + when(mQClientFactory.getPullMessageService()).thenReturn(mock(PullMessageService.class)); + when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPIImpl); + FindBrokerResult findBrokerResult = mock(FindBrokerResult.class); + when(findBrokerResult.getBrokerAddr()).thenReturn(defaultBrokerAddr); + when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(findBrokerResult); + Set<MessageQueue> messageQueueSet = Collections.singleton(createMessageQueue()); + ConcurrentMap<String, Set<MessageQueue>> topicMessageQueueMap = new ConcurrentHashMap<>(); + topicMessageQueueMap.put(defaultTopic, messageQueueSet); + when(rebalanceImpl.getTopicSubscribeInfoTable()).thenReturn(topicMessageQueueMap); + ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(); + when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueTable); + RPCHook rpcHook = mock(RPCHook.class); + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, rpcHook); + defaultMQPushConsumerImpl.setOffsetStore(offsetStore); + FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, "mQClientFactory", mQClientFactory, true); + FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, "rebalanceImpl", rebalanceImpl, true); + FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, "pullAPIWrapper", pullAPIWrapper, true); + FilterMessageHook filterMessageHook = mock(FilterMessageHook.class); + ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<>(); + filterMessageHookList.add(filterMessageHook); + ConsumeMessageService consumeMessagePopService = mock(ConsumeMessageService.class); + ConsumeMessageService consumeMessageService = mock(ConsumeMessageService.class); + FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, "filterMessageHookList", filterMessageHookList, true); + FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, "consumeMessageService", consumeMessageService, true); + FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, "consumeMessagePopService", consumeMessagePopService, true); + ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new ConcurrentHashMap<>(); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTopic(defaultTopic); + subscriptionDataMap.put(defaultTopic, subscriptionData); + when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap); + } + + @Test + public void testFetchSubscribeMessageQueues() throws MQClientException { + Set<MessageQueue> actual = defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(defaultTopic); + assertNotNull(actual); + Assert.assertEquals(1, actual.size()); + MessageQueue next = actual.iterator().next(); + assertEquals(defaultTopic, next.getTopic()); + assertEquals(defaultBroker, next.getBrokerName()); + assertEquals(0, next.getQueueId()); + } + + @Test + public void testEarliestMsgStoreTime() throws MQClientException { + assertEquals(0, defaultMQPushConsumerImpl.earliestMsgStoreTime(createMessageQueue())); + } + + @Test + public void testMaxOffset() throws MQClientException { + assertEquals(0, defaultMQPushConsumerImpl.maxOffset(createMessageQueue())); + } + + @Test + public void testMinOffset() throws MQClientException { + assertEquals(0, defaultMQPushConsumerImpl.minOffset(createMessageQueue())); + } + + @Test + public void testGetOffsetStore() { + assertEquals(offsetStore, defaultMQPushConsumerImpl.getOffsetStore()); + } + + @Test + public void testPullMessageWithStateNotOk() { + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithIsPause() { + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + defaultMQPushConsumerImpl.setPause(true); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithMsgCountFlowControl() { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + TreeMap<Long, MessageExt> treeMap = new TreeMap<>(); + treeMap.put(1L, new MessageExt()); + when(processQueue.getMsgTreeMap()).thenReturn(treeMap); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(1); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithMsgSizeFlowControl() { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + TreeMap<Long, MessageExt> treeMap = new TreeMap<>(); + treeMap.put(1L, new MessageExt()); + when(processQueue.getMsgTreeMap()).thenReturn(treeMap); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(1); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithMaxSpanFlowControl() { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMaxSpan()).thenReturn(2L); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + TreeMap<Long, MessageExt> treeMap = new TreeMap<>(); + treeMap.put(1L, new MessageExt()); + when(processQueue.getMsgTreeMap()).thenReturn(treeMap); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithNotLocked() { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + defaultMQPushConsumerImpl.setConsumeOrderly(true); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithSubscriptionDataIsNull() { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithNoMatchedMsg() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10); + PullResult pullResultMock = mock(PullResult.class); + when(pullAPIWrapper.processPullResult(any(MessageQueue.class), any(PullResult.class), any(SubscriptionData.class))).thenReturn(pullResultMock); + when(pullResultMock.getPullStatus()).thenReturn(PullStatus.NO_MATCHED_MSG); + doAnswer(invocation -> { + PullCallback callback = invocation.getArgument(12); + PullResult pullResult = mock(PullResult.class); + callback.onSuccess(pullResult); + return null; + }).when(pullAPIWrapper).pullKernelImpl( + any(MessageQueue.class), + any(), + any(), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + anyLong(), + anyLong(), + anyLong(), + any(CommunicationMode.class), + any(PullCallback.class)); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithOffsetIllegal() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10); + PullResult pullResultMock = mock(PullResult.class); + when(pullAPIWrapper.processPullResult(any(MessageQueue.class), any(PullResult.class), any(SubscriptionData.class))).thenReturn(pullResultMock); + when(pullResultMock.getPullStatus()).thenReturn(PullStatus.OFFSET_ILLEGAL); + doAnswer(invocation -> { + PullCallback callback = invocation.getArgument(12); + PullResult pullResult = mock(PullResult.class); + callback.onSuccess(pullResult); + return null; + }).when(pullAPIWrapper).pullKernelImpl( + any(MessageQueue.class), + any(), + any(), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + anyLong(), + anyLong(), + anyLong(), + any(CommunicationMode.class), + any(PullCallback.class)); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPullMessageWithException() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2)); + when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 1024)); + when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(pullRequest.getProcessQueue()).thenReturn(processQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3); + when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10); + doAnswer(invocation -> { + PullCallback callback = invocation.getArgument(12); + callback.onException(new RuntimeException("exception")); + return null; + }).when(pullAPIWrapper).pullKernelImpl( + any(MessageQueue.class), + any(), + any(), + anyLong(), + anyLong(), + anyInt(), + anyInt(), + anyInt(), + anyLong(), + anyLong(), + anyLong(), + any(CommunicationMode.class), + any(PullCallback.class)); + defaultMQPushConsumerImpl.pullMessage(pullRequest); + } + + @Test + public void testPopMessageWithFound() throws RemotingException, InterruptedException, MQClientException { + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + when(popRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(popRequest.getConsumerGroup()).thenReturn(defaultGroup); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new ConcurrentHashMap<>(); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTagsSet(Collections.singleton("*")); + subscriptionDataMap.put(defaultTopic, subscriptionData); + when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap); + doAnswer(invocation -> { + PopCallback callback = invocation.getArgument(5); + PopResult popResult = mock(PopResult.class); + when(popResult.getPopStatus()).thenReturn(PopStatus.FOUND); + when(popResult.getMsgFoundList()).thenReturn(Collections.singletonList(createMessageExt())); + callback.onSuccess(popResult); + return null; + }).when(pullAPIWrapper).popAsync( + any(MessageQueue.class), + anyLong(), + anyInt(), + any(), + anyLong(), + any(PopCallback.class), + anyBoolean(), + anyInt(), + anyBoolean(), + any(), + any()); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithException() throws RemotingException, InterruptedException, MQClientException { + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + when(popRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(popRequest.getConsumerGroup()).thenReturn(defaultGroup); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new ConcurrentHashMap<>(); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTagsSet(Collections.singleton("*")); + subscriptionDataMap.put(defaultTopic, subscriptionData); + when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap); + doAnswer(invocation -> { + PopCallback callback = invocation.getArgument(5); + callback.onException(new RuntimeException("exception")); + return null; + }).when(pullAPIWrapper).popAsync( + any(MessageQueue.class), + anyLong(), + anyInt(), + any(), + anyLong(), + any(PopCallback.class), + anyBoolean(), + anyInt(), + anyBoolean(), + any(), + any()); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithNoNewMsg() throws RemotingException, InterruptedException, MQClientException { + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + when(popRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(popRequest.getConsumerGroup()).thenReturn(defaultGroup); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new ConcurrentHashMap<>(); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTagsSet(Collections.singleton("*")); + subscriptionDataMap.put(defaultTopic, subscriptionData); + when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap); + doAnswer(invocation -> { + PopCallback callback = invocation.getArgument(5); + PopResult popResult = mock(PopResult.class); + when(popResult.getPopStatus()).thenReturn(PopStatus.NO_NEW_MSG); + callback.onSuccess(popResult); + return null; + }).when(pullAPIWrapper).popAsync( + any(MessageQueue.class), + anyLong(), + anyInt(), + any(), + anyLong(), + any(PopCallback.class), + anyBoolean(), + anyInt(), + anyBoolean(), + any(), + any()); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithPollingFull() throws RemotingException, InterruptedException, MQClientException { + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + when(popRequest.getMessageQueue()).thenReturn(createMessageQueue()); + when(popRequest.getConsumerGroup()).thenReturn(defaultGroup); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new ConcurrentHashMap<>(); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setTagsSet(Collections.singleton("*")); + subscriptionDataMap.put(defaultTopic, subscriptionData); + when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap); + doAnswer(invocation -> { + PopCallback callback = invocation.getArgument(5); + PopResult popResult = mock(PopResult.class); + when(popResult.getPopStatus()).thenReturn(PopStatus.POLLING_FULL); + callback.onSuccess(popResult); + return null; + }).when(pullAPIWrapper).popAsync(any( + MessageQueue.class), + anyLong(), + anyInt(), + any(), + anyLong(), + any(PopCallback.class), + anyBoolean(), + anyInt(), + anyBoolean(), + any(), + any()); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithStateNotOk() { + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithIsPause() { + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + defaultMQPushConsumerImpl.setPause(true); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithWaiAckMsgCountFlowControl() { + when(popProcessQueue.getWaiAckMsgCount()).thenReturn(2); + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + when(defaultMQPushConsumer.getPopThresholdForQueue()).thenReturn(1); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + defaultMQPushConsumerImpl.popMessage(popRequest); + } + + @Test + public void testPopMessageWithSubscriptionDataIsNull() throws RemotingException, InterruptedException, MQClientException { + when(popProcessQueue.getWaiAckMsgCount()).thenReturn(2); + when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue); + when(popRequest.getMessageQueue()).thenReturn(createMessageQueue()); + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + when(defaultMQPushConsumer.getPopThresholdForQueue()).thenReturn(3); + defaultMQPushConsumerImpl.popMessage(popRequest); + verify(pullAPIWrapper).popAsync(any(MessageQueue.class), + eq(60000L), + eq(0), + any(), + eq(15000L), + any(PopCallback.class), + eq(true), + eq(0), + eq(false), + any(), + any()); + } + + @Test + public void testQueryMessage() throws InterruptedException, MQClientException { + assertNull(defaultMQPushConsumerImpl.queryMessage(defaultTopic, defaultKey, 1, 0, 1)); + } + + @Test + public void testQueryMessageByUniqKey() throws InterruptedException, MQClientException { + assertNull(defaultMQPushConsumerImpl.queryMessageByUniqKey(defaultTopic, defaultKey)); + } + + @Test + public void testSendMessageBack() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { + defaultMQPushConsumerImpl.sendMessageBack(createMessageExt(), 1, createMessageQueue()); + verify(mqClientAPIImpl).consumerSendMessageBack( + eq(defaultBrokerAddr), + any(), + any(MessageExt.class), + any(), + eq(1), + eq(5000L), + eq(0)); + } + + @Test + public void testAckAsync() throws MQBrokerException, RemotingException, InterruptedException { + doAnswer(invocation -> { + AckCallback callback = invocation.getArgument(2); + AckResult result = mock(AckResult.class); + when(result.getStatus()).thenReturn(AckStatus.OK); + callback.onSuccess(result); + return null; + }).when(mqClientAPIImpl).ackMessageAsync(any(), + anyLong(), + any(AckCallback.class), + any(AckMessageRequestHeader.class)); + defaultMQPushConsumerImpl.ackAsync(createMessageExt(), defaultGroup); + verify(mqClientAPIImpl).ackMessageAsync(eq(defaultBrokerAddr), + eq(3000L), + any(AckCallback.class), + any(AckMessageRequestHeader.class)); + } + + @Test + public void testChangePopInvisibleTimeAsync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + AckCallback callback = mock(AckCallback.class); + String extraInfo = createMessageExt().getProperty(MessageConst.PROPERTY_POP_CK); + defaultMQPushConsumerImpl.changePopInvisibleTimeAsync(defaultTopic, defaultGroup, extraInfo, defaultTimeout, callback); + verify(mqClientAPIImpl).changeInvisibleTimeAsync(eq(defaultBroker), + eq(defaultBrokerAddr), + any(ChangeInvisibleTimeRequestHeader.class), + eq(defaultTimeout), + any(AckCallback.class)); + } + + @Test + public void testShutdown() { + defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING); + defaultMQPushConsumerImpl.shutdown(); + assertEquals(ServiceState.SHUTDOWN_ALREADY, defaultMQPushConsumerImpl.getServiceState()); + } + + @Test + public void testSubscribe() throws MQClientException { + defaultMQPushConsumerImpl.subscribe(defaultTopic, "fullClassname", "filterClassSource"); + RebalanceImpl actual = defaultMQPushConsumerImpl.getRebalanceImpl(); + assertEquals(1, actual.getSubscriptionInner().size()); + } + + @Test + public void testSubscribeByMessageSelector() throws MQClientException { + MessageSelector messageSelector = mock(MessageSelector.class); + defaultMQPushConsumerImpl.subscribe(defaultTopic, messageSelector); + RebalanceImpl actual = defaultMQPushConsumerImpl.getRebalanceImpl(); + assertEquals(1, actual.getSubscriptionInner().size()); + } + + @Test + public void testSuspend() { + defaultMQPushConsumerImpl.suspend(); + assertTrue(defaultMQPushConsumerImpl.isPause()); + } + + @Test + public void testViewMessage() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { + assertNull(defaultMQPushConsumerImpl.viewMessage(defaultTopic, createMessageExt().getMsgId())); + } + + @Test + public void testResetOffsetByTimeStamp() throws MQClientException { + ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new ConcurrentHashMap<>(); + subscriptionDataMap.put(defaultTopic, new SubscriptionData()); + when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap); + defaultMQPushConsumerImpl.resetOffsetByTimeStamp(System.currentTimeMillis()); + verify(mQClientFactory).resetOffset(eq(defaultTopic), any(), any()); + } + + @Test + public void testSearchOffset() throws MQClientException { + assertEquals(0, defaultMQPushConsumerImpl.searchOffset(createMessageQueue(), System.currentTimeMillis())); + } + + @Test + public void testQueryConsumeTimeSpan() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { + TopicRouteData topicRouteData = new TopicRouteData(); + topicRouteData.getBrokerDatas().add(createBrokerData()); + when(mqClientAPIImpl.getTopicRouteInfoFromNameServer(any(), anyLong())).thenReturn(topicRouteData); + List<QueueTimeSpan> actual = defaultMQPushConsumerImpl.queryConsumeTimeSpan(defaultTopic); + assertNotNull(actual); + assertEquals(0, actual.size()); + } + + @Test + public void testTryResetPopRetryTopic() { + TopicRouteData topicRouteData = new TopicRouteData(); + topicRouteData.getBrokerDatas().add(createBrokerData()); + MessageExt messageExt = createMessageExt(); + List<MessageExt> msgs = new ArrayList<>(); + messageExt.setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + defaultGroup + "_" + defaultTopic); + msgs.add(messageExt); + defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, defaultGroup); + assertEquals(defaultTopic, msgs.get(0).getTopic()); + } + + @Test + public void testGetPopDelayLevel() { + int[] actual = defaultMQPushConsumerImpl.getPopDelayLevel(); + int[] expected = new int[]{10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; + assertArrayEquals(expected, actual); + } + + @Test + public void testGetMessageQueueListener() { + assertNull(defaultMQPushConsumerImpl.getMessageQueueListener()); + } + + @Test + public void testConsumerRunningInfo() { + ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new ConcurrentHashMap<>(); + ConcurrentMap<MessageQueue, PopProcessQueue> popProcessQueueMap = new ConcurrentHashMap<>(); + processQueueMap.put(createMessageQueue(), new ProcessQueue()); + popProcessQueueMap.put(createMessageQueue(), new PopProcessQueue()); + when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueMap); + when(rebalanceImpl.getPopProcessQueueTable()).thenReturn(popProcessQueueMap); + ConsumerRunningInfo actual = defaultMQPushConsumerImpl.consumerRunningInfo(); + assertNotNull(actual); + assertEquals(1, actual.getSubscriptionSet().size()); + assertEquals(defaultTopic, actual.getSubscriptionSet().iterator().next().getTopic()); + assertEquals(1, actual.getMqTable().size()); + assertEquals(1, actual.getMqPopTable().size()); + assertEquals(1, actual.getStatusTable().size()); + } + + private BrokerData createBrokerData() { + BrokerData result = new BrokerData(); + HashMap<Long, String> brokerAddrMap = new HashMap<>(); + brokerAddrMap.put(MixAll.MASTER_ID, defaultBrokerAddr); + result.setBrokerAddrs(brokerAddrMap); + result.setBrokerName(defaultBroker); + return result; + } + + private MessageQueue createMessageQueue() { + MessageQueue result = new MessageQueue(); + result.setQueueId(0); + result.setBrokerName(defaultBroker); + result.setTopic(defaultTopic); + return result; + } + + private MessageExt createMessageExt() { + MessageExt result = new MessageExt(); + result.setBody("body".getBytes(StandardCharsets.UTF_8)); + result.setTopic(defaultTopic); + result.setBrokerName(defaultBroker); + result.putUserProperty("key", "value"); + result.getProperties().put(MessageConst.PROPERTY_PRODUCER_GROUP, defaultGroup); + result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "TX1"); + long curTime = System.currentTimeMillis(); + result.setBornTimestamp(curTime - 1000); + String popProps = String.format("%d %d %d %d %d %s %d %d %d", curTime, curTime, curTime, curTime, curTime, defaultBroker, 1, 0L, 1L); + result.getProperties().put(MessageConst.PROPERTY_POP_CK, popProps); + result.setKeys("keys"); + result.setTags("*"); + SocketAddress bornHost = new InetSocketAddress("127.0.0.1", 12911); + SocketAddress storeHost = new InetSocketAddress("127.0.0.1", 10911); + result.setBornHost(bornHost); + result.setStoreHost(storeHost); + return result; + } }