This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bd61774b1f [ISSUE #8411] Add more test coverage for 
DefaultMQPushConsumerImpl (#8412)
bd61774b1f is described below

commit bd61774b1fc19f92c8c3a466420d50899ed61d9b
Author: yx9o <yangx_s...@163.com>
AuthorDate: Mon Jul 22 10:05:14 2024 +0800

    [ISSUE #8411] Add more test coverage for DefaultMQPushConsumerImpl (#8412)
    
    * [ISSUE #8411] Add more test coverage for DefaultMQPushConsumerImpl
    
    * Update
    
    * Update
---
 .../consumer/DefaultMQPushConsumerImplTest.java    | 736 ++++++++++++++++++++-
 1 file changed, 719 insertions(+), 17 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
index 879bbc593c..68563c0256 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -17,17 +17,50 @@
 
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.util.List;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.ConsumeMessageContext;
 import org.apache.rocketmq.client.hook.ConsumeMessageHook;
 import org.apache.rocketmq.client.hook.FilterMessageContext;
 import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -36,17 +69,85 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DefaultMQPushConsumerImplTest {
+
     @Mock
     private DefaultMQPushConsumer defaultMQPushConsumer;
 
+    @Mock
+    private MQClientInstance mQClientFactory;
+
+    @Mock
+    private RebalanceImpl rebalanceImpl;
+
+    @Mock
+    private PullAPIWrapper pullAPIWrapper;
+
+    @Mock
+    private PullRequest pullRequest;
+
+    @Mock
+    private PopRequest popRequest;
+
+    @Mock
+    private ProcessQueue processQueue;
+
+    @Mock
+    private PopProcessQueue popProcessQueue;
+
+    @Mock
+    private MQClientAPIImpl mqClientAPIImpl;
+
+    @Mock
+    private OffsetStore offsetStore;
+
+    private DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+
     @Rule
     public ExpectedException thrown = ExpectedException.none();
 
+    private final String defaultKey = "defaultKey";
+
+    private final String defaultTopic = "defaultTopic";
+
+    private final String defaultBroker = "defaultBroker";
+
+    private final String defaultBrokerAddr = "127.0.0.1:10911";
+
+    private final String defaultGroup = "defaultGroup";
+
+    private final long defaultTimeout = 3000L;
 
     @Test
     public void checkConfigTest() throws MQClientException {
@@ -62,19 +163,14 @@ public class DefaultMQPushConsumerImplTest {
         consumer.setConsumeThreadMin(10);
         consumer.setConsumeThreadMax(9);
 
-        consumer.registerMessageListener(new MessageListenerConcurrently() {
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
-                ConsumeConcurrentlyContext context) {
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-            }
-        });
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
context) -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
 
         DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new 
DefaultMQPushConsumerImpl(consumer, null);
         defaultMQPushConsumerImpl.start();
     }
 
     @Test
-    public void testHook() throws Exception {
+    public void testHook() {
         DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new 
DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
         defaultMQPushConsumerImpl.registerConsumeMessageHook(new 
ConsumeMessageHook() {
             @Override
@@ -110,14 +206,10 @@ public class DefaultMQPushConsumerImplTest {
     @Ignore
     @Test
     public void testPush() throws Exception {
-        when(defaultMQPushConsumer.getMessageListener()).thenReturn(new 
MessageListenerConcurrently() {
-            @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
-                ConsumeConcurrentlyContext context) {
-                assertThat(msgs).size().isGreaterThan(0);
-                assertThat(context).isNotNull();
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-            }
+        
when(defaultMQPushConsumer.getMessageListener()).thenReturn((MessageListenerConcurrently)
 (msgs, context) -> {
+            assertThat(msgs).size().isGreaterThan(0);
+            assertThat(context).isNotNull();
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
         DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new 
DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
         try {
@@ -126,4 +218,614 @@ public class DefaultMQPushConsumerImplTest {
             defaultMQPushConsumerImpl.shutdown();
         }
     }
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        MQAdminImpl mqAdminImpl = mock(MQAdminImpl.class);
+        when(mQClientFactory.getMQAdminImpl()).thenReturn(mqAdminImpl);
+        ConsumerStatsManager consumerStatsManager = 
mock(ConsumerStatsManager.class);
+        ConsumeStatus consumeStatus = mock(ConsumeStatus.class);
+        when(consumerStatsManager.consumeStatus(any(), 
any())).thenReturn(consumeStatus);
+        
when(mQClientFactory.getConsumerStatsManager()).thenReturn(consumerStatsManager);
+        
when(mQClientFactory.getPullMessageService()).thenReturn(mock(PullMessageService.class));
+        when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPIImpl);
+        FindBrokerResult findBrokerResult = mock(FindBrokerResult.class);
+        when(findBrokerResult.getBrokerAddr()).thenReturn(defaultBrokerAddr);
+        when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(findBrokerResult);
+        Set<MessageQueue> messageQueueSet = 
Collections.singleton(createMessageQueue());
+        ConcurrentMap<String, Set<MessageQueue>> topicMessageQueueMap = new 
ConcurrentHashMap<>();
+        topicMessageQueueMap.put(defaultTopic, messageQueueSet);
+        
when(rebalanceImpl.getTopicSubscribeInfoTable()).thenReturn(topicMessageQueueMap);
+        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new 
ConcurrentHashMap<>();
+        
when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueTable);
+        RPCHook rpcHook = mock(RPCHook.class);
+        defaultMQPushConsumerImpl = new 
DefaultMQPushConsumerImpl(defaultMQPushConsumer, rpcHook);
+        defaultMQPushConsumerImpl.setOffsetStore(offsetStore);
+        FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, 
"mQClientFactory", mQClientFactory, true);
+        FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, 
"rebalanceImpl", rebalanceImpl, true);
+        FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, 
"pullAPIWrapper", pullAPIWrapper, true);
+        FilterMessageHook filterMessageHook = mock(FilterMessageHook.class);
+        ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<>();
+        filterMessageHookList.add(filterMessageHook);
+        ConsumeMessageService consumeMessagePopService = 
mock(ConsumeMessageService.class);
+        ConsumeMessageService consumeMessageService = 
mock(ConsumeMessageService.class);
+        FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, 
"filterMessageHookList", filterMessageHookList, true);
+        FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, 
"consumeMessageService", consumeMessageService, true);
+        FieldUtils.writeDeclaredField(defaultMQPushConsumerImpl, 
"consumeMessagePopService", consumeMessagePopService, true);
+        ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new 
ConcurrentHashMap<>();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(defaultTopic);
+        subscriptionDataMap.put(defaultTopic, subscriptionData);
+        
when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap);
+    }
+
+    @Test
+    public void testFetchSubscribeMessageQueues() throws MQClientException {
+        Set<MessageQueue> actual = 
defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(defaultTopic);
+        assertNotNull(actual);
+        Assert.assertEquals(1, actual.size());
+        MessageQueue next = actual.iterator().next();
+        assertEquals(defaultTopic, next.getTopic());
+        assertEquals(defaultBroker, next.getBrokerName());
+        assertEquals(0, next.getQueueId());
+    }
+
+    @Test
+    public void testEarliestMsgStoreTime() throws MQClientException {
+        assertEquals(0, 
defaultMQPushConsumerImpl.earliestMsgStoreTime(createMessageQueue()));
+    }
+
+    @Test
+    public void testMaxOffset() throws MQClientException {
+        assertEquals(0, 
defaultMQPushConsumerImpl.maxOffset(createMessageQueue()));
+    }
+
+    @Test
+    public void testMinOffset() throws MQClientException {
+        assertEquals(0, 
defaultMQPushConsumerImpl.minOffset(createMessageQueue()));
+    }
+
+    @Test
+    public void testGetOffsetStore() {
+        assertEquals(offsetStore, defaultMQPushConsumerImpl.getOffsetStore());
+    }
+
+    @Test
+    public void testPullMessageWithStateNotOk() {
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithIsPause() {
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        defaultMQPushConsumerImpl.setPause(true);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithMsgCountFlowControl() {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        TreeMap<Long, MessageExt> treeMap = new TreeMap<>();
+        treeMap.put(1L, new MessageExt());
+        when(processQueue.getMsgTreeMap()).thenReturn(treeMap);
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(1);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithMsgSizeFlowControl() {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        TreeMap<Long, MessageExt> treeMap = new TreeMap<>();
+        treeMap.put(1L, new MessageExt());
+        when(processQueue.getMsgTreeMap()).thenReturn(treeMap);
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(1);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithMaxSpanFlowControl() {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMaxSpan()).thenReturn(2L);
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        TreeMap<Long, MessageExt> treeMap = new TreeMap<>();
+        treeMap.put(1L, new MessageExt());
+        when(processQueue.getMsgTreeMap()).thenReturn(treeMap);
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithNotLocked() {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        defaultMQPushConsumerImpl.setConsumeOrderly(true);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithSubscriptionDataIsNull() {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10);
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithNoMatchedMsg() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10);
+        PullResult pullResultMock = mock(PullResult.class);
+        when(pullAPIWrapper.processPullResult(any(MessageQueue.class), 
any(PullResult.class), any(SubscriptionData.class))).thenReturn(pullResultMock);
+        
when(pullResultMock.getPullStatus()).thenReturn(PullStatus.NO_MATCHED_MSG);
+        doAnswer(invocation -> {
+            PullCallback callback = invocation.getArgument(12);
+            PullResult pullResult = mock(PullResult.class);
+            callback.onSuccess(pullResult);
+            return null;
+        }).when(pullAPIWrapper).pullKernelImpl(
+                any(MessageQueue.class),
+                any(),
+                any(),
+                anyLong(),
+                anyLong(),
+                anyInt(),
+                anyInt(),
+                anyInt(),
+                anyLong(),
+                anyLong(),
+                anyLong(),
+                any(CommunicationMode.class),
+                any(PullCallback.class));
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithOffsetIllegal() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10);
+        PullResult pullResultMock = mock(PullResult.class);
+        when(pullAPIWrapper.processPullResult(any(MessageQueue.class), 
any(PullResult.class), any(SubscriptionData.class))).thenReturn(pullResultMock);
+        
when(pullResultMock.getPullStatus()).thenReturn(PullStatus.OFFSET_ILLEGAL);
+        doAnswer(invocation -> {
+            PullCallback callback = invocation.getArgument(12);
+            PullResult pullResult = mock(PullResult.class);
+            callback.onSuccess(pullResult);
+            return null;
+        }).when(pullAPIWrapper).pullKernelImpl(
+                any(MessageQueue.class),
+                any(),
+                any(),
+                anyLong(),
+                anyLong(),
+                anyInt(),
+                anyInt(),
+                anyInt(),
+                anyLong(),
+                anyLong(),
+                anyLong(),
+                any(CommunicationMode.class),
+                any(PullCallback.class));
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPullMessageWithException() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        when(processQueue.getMsgCount()).thenReturn(new AtomicLong(2));
+        when(processQueue.getMsgSize()).thenReturn(new AtomicLong(3 * 1024 * 
1024));
+        when(pullRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(pullRequest.getProcessQueue()).thenReturn(processQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(3);
+        
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(10);
+        doAnswer(invocation -> {
+            PullCallback callback = invocation.getArgument(12);
+            callback.onException(new RuntimeException("exception"));
+            return null;
+        }).when(pullAPIWrapper).pullKernelImpl(
+                any(MessageQueue.class),
+                any(),
+                any(),
+                anyLong(),
+                anyLong(),
+                anyInt(),
+                anyInt(),
+                anyInt(),
+                anyLong(),
+                anyLong(),
+                anyLong(),
+                any(CommunicationMode.class),
+                any(PullCallback.class));
+        defaultMQPushConsumerImpl.pullMessage(pullRequest);
+    }
+
+    @Test
+    public void testPopMessageWithFound() throws RemotingException, 
InterruptedException, MQClientException {
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        when(popRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(popRequest.getConsumerGroup()).thenReturn(defaultGroup);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new 
ConcurrentHashMap<>();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTagsSet(Collections.singleton("*"));
+        subscriptionDataMap.put(defaultTopic, subscriptionData);
+        
when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap);
+        doAnswer(invocation -> {
+            PopCallback callback = invocation.getArgument(5);
+            PopResult popResult = mock(PopResult.class);
+            when(popResult.getPopStatus()).thenReturn(PopStatus.FOUND);
+            
when(popResult.getMsgFoundList()).thenReturn(Collections.singletonList(createMessageExt()));
+            callback.onSuccess(popResult);
+            return null;
+        }).when(pullAPIWrapper).popAsync(
+                any(MessageQueue.class),
+                anyLong(),
+                anyInt(),
+                any(),
+                anyLong(),
+                any(PopCallback.class),
+                anyBoolean(),
+                anyInt(),
+                anyBoolean(),
+                any(),
+                any());
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithException() throws RemotingException, 
InterruptedException, MQClientException {
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        when(popRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(popRequest.getConsumerGroup()).thenReturn(defaultGroup);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new 
ConcurrentHashMap<>();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTagsSet(Collections.singleton("*"));
+        subscriptionDataMap.put(defaultTopic, subscriptionData);
+        
when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap);
+        doAnswer(invocation -> {
+            PopCallback callback = invocation.getArgument(5);
+            callback.onException(new RuntimeException("exception"));
+            return null;
+        }).when(pullAPIWrapper).popAsync(
+                any(MessageQueue.class),
+                anyLong(),
+                anyInt(),
+                any(),
+                anyLong(),
+                any(PopCallback.class),
+                anyBoolean(),
+                anyInt(),
+                anyBoolean(),
+                any(),
+                any());
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithNoNewMsg() throws RemotingException, 
InterruptedException, MQClientException {
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        when(popRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(popRequest.getConsumerGroup()).thenReturn(defaultGroup);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new 
ConcurrentHashMap<>();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTagsSet(Collections.singleton("*"));
+        subscriptionDataMap.put(defaultTopic, subscriptionData);
+        
when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap);
+        doAnswer(invocation -> {
+            PopCallback callback = invocation.getArgument(5);
+            PopResult popResult = mock(PopResult.class);
+            when(popResult.getPopStatus()).thenReturn(PopStatus.NO_NEW_MSG);
+            callback.onSuccess(popResult);
+            return null;
+        }).when(pullAPIWrapper).popAsync(
+                any(MessageQueue.class),
+                anyLong(),
+                anyInt(),
+                any(),
+                anyLong(),
+                any(PopCallback.class),
+                anyBoolean(),
+                anyInt(),
+                anyBoolean(),
+                any(),
+                any());
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithPollingFull() throws RemotingException, 
InterruptedException, MQClientException {
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        when(popRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        when(popRequest.getConsumerGroup()).thenReturn(defaultGroup);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new 
ConcurrentHashMap<>();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTagsSet(Collections.singleton("*"));
+        subscriptionDataMap.put(defaultTopic, subscriptionData);
+        
when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap);
+        doAnswer(invocation -> {
+            PopCallback callback = invocation.getArgument(5);
+            PopResult popResult = mock(PopResult.class);
+            when(popResult.getPopStatus()).thenReturn(PopStatus.POLLING_FULL);
+            callback.onSuccess(popResult);
+            return null;
+        }).when(pullAPIWrapper).popAsync(any(
+                        MessageQueue.class),
+                anyLong(),
+                anyInt(),
+                any(),
+                anyLong(),
+                any(PopCallback.class),
+                anyBoolean(),
+                anyInt(),
+                anyBoolean(),
+                any(),
+                any());
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithStateNotOk() {
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithIsPause() {
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        defaultMQPushConsumerImpl.setPause(true);
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithWaiAckMsgCountFlowControl() {
+        when(popProcessQueue.getWaiAckMsgCount()).thenReturn(2);
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        when(defaultMQPushConsumer.getPopThresholdForQueue()).thenReturn(1);
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+    }
+
+    @Test
+    public void testPopMessageWithSubscriptionDataIsNull() throws 
RemotingException, InterruptedException, MQClientException {
+        when(popProcessQueue.getWaiAckMsgCount()).thenReturn(2);
+        when(popRequest.getPopProcessQueue()).thenReturn(popProcessQueue);
+        when(popRequest.getMessageQueue()).thenReturn(createMessageQueue());
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        when(defaultMQPushConsumer.getPopThresholdForQueue()).thenReturn(3);
+        defaultMQPushConsumerImpl.popMessage(popRequest);
+        verify(pullAPIWrapper).popAsync(any(MessageQueue.class),
+                eq(60000L),
+                eq(0),
+                any(),
+                eq(15000L),
+                any(PopCallback.class),
+                eq(true),
+                eq(0),
+                eq(false),
+                any(),
+                any());
+    }
+
+    @Test
+    public void testQueryMessage() throws InterruptedException, 
MQClientException {
+        assertNull(defaultMQPushConsumerImpl.queryMessage(defaultTopic, 
defaultKey, 1, 0, 1));
+    }
+
+    @Test
+    public void testQueryMessageByUniqKey() throws InterruptedException, 
MQClientException {
+        
assertNull(defaultMQPushConsumerImpl.queryMessageByUniqKey(defaultTopic, 
defaultKey));
+    }
+
+    @Test
+    public void testSendMessageBack() throws InterruptedException, 
MQClientException, MQBrokerException, RemotingException {
+        defaultMQPushConsumerImpl.sendMessageBack(createMessageExt(), 1, 
createMessageQueue());
+        verify(mqClientAPIImpl).consumerSendMessageBack(
+                eq(defaultBrokerAddr),
+                any(),
+                any(MessageExt.class),
+                any(),
+                eq(1),
+                eq(5000L),
+                eq(0));
+    }
+
+    @Test
+    public void testAckAsync() throws MQBrokerException, RemotingException, 
InterruptedException {
+        doAnswer(invocation -> {
+            AckCallback callback = invocation.getArgument(2);
+            AckResult result = mock(AckResult.class);
+            when(result.getStatus()).thenReturn(AckStatus.OK);
+            callback.onSuccess(result);
+            return null;
+        }).when(mqClientAPIImpl).ackMessageAsync(any(),
+                anyLong(),
+                any(AckCallback.class),
+                any(AckMessageRequestHeader.class));
+        defaultMQPushConsumerImpl.ackAsync(createMessageExt(), defaultGroup);
+        verify(mqClientAPIImpl).ackMessageAsync(eq(defaultBrokerAddr),
+                eq(3000L),
+                any(AckCallback.class),
+                any(AckMessageRequestHeader.class));
+    }
+
+    @Test
+    public void testChangePopInvisibleTimeAsync() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        AckCallback callback = mock(AckCallback.class);
+        String extraInfo = 
createMessageExt().getProperty(MessageConst.PROPERTY_POP_CK);
+        defaultMQPushConsumerImpl.changePopInvisibleTimeAsync(defaultTopic, 
defaultGroup, extraInfo, defaultTimeout, callback);
+        verify(mqClientAPIImpl).changeInvisibleTimeAsync(eq(defaultBroker),
+                eq(defaultBrokerAddr),
+                any(ChangeInvisibleTimeRequestHeader.class),
+                eq(defaultTimeout),
+                any(AckCallback.class));
+    }
+
+    @Test
+    public void testShutdown() {
+        defaultMQPushConsumerImpl.setServiceState(ServiceState.RUNNING);
+        defaultMQPushConsumerImpl.shutdown();
+        assertEquals(ServiceState.SHUTDOWN_ALREADY, 
defaultMQPushConsumerImpl.getServiceState());
+    }
+
+    @Test
+    public void testSubscribe() throws MQClientException {
+        defaultMQPushConsumerImpl.subscribe(defaultTopic, "fullClassname", 
"filterClassSource");
+        RebalanceImpl actual = defaultMQPushConsumerImpl.getRebalanceImpl();
+        assertEquals(1, actual.getSubscriptionInner().size());
+    }
+
+    @Test
+    public void testSubscribeByMessageSelector() throws MQClientException {
+        MessageSelector messageSelector = mock(MessageSelector.class);
+        defaultMQPushConsumerImpl.subscribe(defaultTopic, messageSelector);
+        RebalanceImpl actual = defaultMQPushConsumerImpl.getRebalanceImpl();
+        assertEquals(1, actual.getSubscriptionInner().size());
+    }
+
+    @Test
+    public void testSuspend() {
+        defaultMQPushConsumerImpl.suspend();
+        assertTrue(defaultMQPushConsumerImpl.isPause());
+    }
+
+    @Test
+    public void testViewMessage() throws InterruptedException, 
MQClientException, MQBrokerException, RemotingException {
+        assertNull(defaultMQPushConsumerImpl.viewMessage(defaultTopic, 
createMessageExt().getMsgId()));
+    }
+
+    @Test
+    public void testResetOffsetByTimeStamp() throws MQClientException {
+        ConcurrentMap<String, SubscriptionData> subscriptionDataMap = new 
ConcurrentHashMap<>();
+        subscriptionDataMap.put(defaultTopic, new SubscriptionData());
+        
when(rebalanceImpl.getSubscriptionInner()).thenReturn(subscriptionDataMap);
+        
defaultMQPushConsumerImpl.resetOffsetByTimeStamp(System.currentTimeMillis());
+        verify(mQClientFactory).resetOffset(eq(defaultTopic), any(), any());
+    }
+
+    @Test
+    public void testSearchOffset() throws MQClientException {
+        assertEquals(0, 
defaultMQPushConsumerImpl.searchOffset(createMessageQueue(), 
System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testQueryConsumeTimeSpan() throws InterruptedException, 
MQClientException, MQBrokerException, RemotingException {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.getBrokerDatas().add(createBrokerData());
+        when(mqClientAPIImpl.getTopicRouteInfoFromNameServer(any(), 
anyLong())).thenReturn(topicRouteData);
+        List<QueueTimeSpan> actual = 
defaultMQPushConsumerImpl.queryConsumeTimeSpan(defaultTopic);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void testTryResetPopRetryTopic() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.getBrokerDatas().add(createBrokerData());
+        MessageExt messageExt = createMessageExt();
+        List<MessageExt> msgs = new ArrayList<>();
+        messageExt.setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + defaultGroup + 
"_" + defaultTopic);
+        msgs.add(messageExt);
+        defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, defaultGroup);
+        assertEquals(defaultTopic, msgs.get(0).getTopic());
+    }
+
+    @Test
+    public void testGetPopDelayLevel() {
+        int[] actual = defaultMQPushConsumerImpl.getPopDelayLevel();
+        int[] expected = new int[]{10, 30, 60, 120, 180, 240, 300, 360, 420, 
480, 540, 600, 1200, 1800, 3600, 7200};
+        assertArrayEquals(expected, actual);
+    }
+
+    @Test
+    public void testGetMessageQueueListener() {
+        assertNull(defaultMQPushConsumerImpl.getMessageQueueListener());
+    }
+
+    @Test
+    public void testConsumerRunningInfo() {
+        ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new 
ConcurrentHashMap<>();
+        ConcurrentMap<MessageQueue, PopProcessQueue> popProcessQueueMap = new 
ConcurrentHashMap<>();
+        processQueueMap.put(createMessageQueue(), new ProcessQueue());
+        popProcessQueueMap.put(createMessageQueue(), new PopProcessQueue());
+        when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueMap);
+        
when(rebalanceImpl.getPopProcessQueueTable()).thenReturn(popProcessQueueMap);
+        ConsumerRunningInfo actual = 
defaultMQPushConsumerImpl.consumerRunningInfo();
+        assertNotNull(actual);
+        assertEquals(1, actual.getSubscriptionSet().size());
+        assertEquals(defaultTopic, 
actual.getSubscriptionSet().iterator().next().getTopic());
+        assertEquals(1, actual.getMqTable().size());
+        assertEquals(1, actual.getMqPopTable().size());
+        assertEquals(1, actual.getStatusTable().size());
+    }
+
+    private BrokerData createBrokerData() {
+        BrokerData result = new BrokerData();
+        HashMap<Long, String> brokerAddrMap = new HashMap<>();
+        brokerAddrMap.put(MixAll.MASTER_ID, defaultBrokerAddr);
+        result.setBrokerAddrs(brokerAddrMap);
+        result.setBrokerName(defaultBroker);
+        return result;
+    }
+
+    private MessageQueue createMessageQueue() {
+        MessageQueue result = new MessageQueue();
+        result.setQueueId(0);
+        result.setBrokerName(defaultBroker);
+        result.setTopic(defaultTopic);
+        return result;
+    }
+
+    private MessageExt createMessageExt() {
+        MessageExt result = new MessageExt();
+        result.setBody("body".getBytes(StandardCharsets.UTF_8));
+        result.setTopic(defaultTopic);
+        result.setBrokerName(defaultBroker);
+        result.putUserProperty("key", "value");
+        result.getProperties().put(MessageConst.PROPERTY_PRODUCER_GROUP, 
defaultGroup);
+        
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
"TX1");
+        long curTime = System.currentTimeMillis();
+        result.setBornTimestamp(curTime - 1000);
+        String popProps = String.format("%d %d %d %d %d %s %d %d %d", curTime, 
curTime, curTime, curTime, curTime, defaultBroker, 1, 0L, 1L);
+        result.getProperties().put(MessageConst.PROPERTY_POP_CK, popProps);
+        result.setKeys("keys");
+        result.setTags("*");
+        SocketAddress bornHost = new InetSocketAddress("127.0.0.1", 12911);
+        SocketAddress storeHost = new InetSocketAddress("127.0.0.1", 10911);
+        result.setBornHost(bornHost);
+        result.setStoreHost(storeHost);
+        return result;
+    }
 }


Reply via email to