This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 73d0c33f4f add tests for ConsumerManageProcessor (#8401)
73d0c33f4f is described below

commit 73d0c33f4f1ab5e2b57afa4afc6203000aebeabd
Author: Tan Xiang <82364837+tanxiang...@users.noreply.github.com>
AuthorDate: Thu Jul 18 14:21:09 2024 +0800

    add tests for ConsumerManageProcessor (#8401)
---
 .../processor/ConsumerManageProcessorTest.java     | 186 ++++++++++++++++++++-
 1 file changed, 185 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
index c94591d381..6b3c2578af 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
@@ -16,19 +16,36 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem;
+import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingContext;
+import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.rpc.RpcClient;
+import org.apache.rocketmq.remoting.rpc.RpcException;
+import org.apache.rocketmq.remoting.rpc.RpcResponse;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Before;
@@ -38,7 +55,17 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -50,12 +77,24 @@ public class ConsumerManageProcessorTest {
     private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
     @Mock
     private MessageStore messageStore;
+    @Mock
+    private Channel channel;
+    @Mock
+    private ConsumerOffsetManager consumerOffsetManager;
+    @Mock
+    private BrokerOuterAPI brokerOuterAPI;
+    @Mock
+    private RpcClient rpcClient;
+    @Mock
+    private Future<RpcResponse> responseFuture;
+    @Mock
+    private TopicQueueMappingContext mappingContext;
 
     private String topic = "FooBar";
     private String group = "FooBarGroup";
 
     @Before
-    public void init() {
+    public void init() throws RpcException {
         brokerController.setMessageStore(messageStore);
         TopicConfigManager topicConfigManager = new 
TopicConfigManager(brokerController);
         topicConfigManager.getTopicConfigTable().put(topic, new 
TopicConfig(topic));
@@ -64,6 +103,12 @@ public class ConsumerManageProcessorTest {
         subscriptionGroupManager.getSubscriptionGroupTable().put(group, new 
SubscriptionGroupConfig());
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
         consumerManageProcessor = new 
ConsumerManageProcessor(brokerController);
+        when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+        when(brokerOuterAPI.getRpcClient()).thenReturn(rpcClient);
+        when(rpcClient.invoke(any(),anyLong())).thenReturn(responseFuture);
+        TopicQueueMappingDetail topicQueueMappingDetail = new 
TopicQueueMappingDetail();
+        topicQueueMappingDetail.setBname("BrokerA");
+        
when(mappingContext.getMappingDetail()).thenReturn(topicQueueMappingDetail);
     }
 
     @Test
@@ -82,6 +127,145 @@ public class ConsumerManageProcessorTest {
         
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
     }
 
+    @Test
+    public void testUpdateConsumerOffset() throws RemotingCommandException {
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+        
when(consumerOffsetManager.hasOffsetReset(anyString(),anyString(),anyInt())).thenReturn(true);
+        RemotingCommand request = buildUpdateConsumerOffsetRequest(group, 
topic, 0, 0);
+        RemotingCommand response = 
consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        
when(consumerOffsetManager.hasOffsetReset(anyString(),anyString(),anyInt())).thenReturn(false);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetConsumerListByGroup() throws RemotingCommandException {
+        GetConsumerListByGroupRequestHeader requestHeader = new 
GetConsumerListByGroupRequestHeader();
+        requestHeader.setConsumerGroup(group);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        brokerController.getConsumerManager().getConsumerTable().put(group,new 
ConsumerGroupInfo(group));
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        ConsumerGroupInfo consumerGroupInfo =
+                
this.brokerController.getConsumerManager().getConsumerGroupInfo(
+                        requestHeader.getConsumerGroup());
+        consumerGroupInfo.getChannelInfoTable().put(channel,new 
ClientChannelInfo(channel));
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testQueryConsumerOffset() throws RemotingCommandException, 
ExecutionException, InterruptedException {
+        RemotingCommand request = buildQueryConsumerOffsetRequest(group, 
topic, 0, true);
+        RemotingCommand response = 
consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
+
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+        
when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(0L);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        
when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(-1L);
+        
when(messageStore.getMinOffsetInQueue(anyString(),anyInt())).thenReturn(-1L);
+        
when(messageStore.checkInMemByConsumeOffset(anyString(),anyInt(),anyLong(),anyInt())).thenReturn(true);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        TopicQueueMappingManager topicQueueMappingManager = 
mock(TopicQueueMappingManager.class);
+        
when(brokerController.getTopicQueueMappingManager()).thenReturn(topicQueueMappingManager);
+        
when(topicQueueMappingManager.buildTopicQueueMappingContext(any(QueryConsumerOffsetRequestHeader.class))).thenReturn(mappingContext);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.NOT_LEADER_FOR_QUEUE);
+
+        List<LogicQueueMappingItem> items = new ArrayList<>();
+        LogicQueueMappingItem item1 = createLogicQueueMappingItem("BrokerC", 
0, 0L, 0L);
+        items.add(item1);
+        when(mappingContext.getMappingItemList()).thenReturn(items);
+        when(mappingContext.getLeaderItem()).thenReturn(item1);
+        when(mappingContext.getCurrentItem()).thenReturn(item1);
+        when(mappingContext.isLeader()).thenReturn(true);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        LogicQueueMappingItem item2 = createLogicQueueMappingItem("BrokerA", 
0, 0L, 0L);
+        items.add(item2);
+        QueryConsumerOffsetResponseHeader queryConsumerOffsetResponseHeader = 
new QueryConsumerOffsetResponseHeader();
+        queryConsumerOffsetResponseHeader.setOffset(0L);
+        RpcResponse rpcResponse = new 
RpcResponse(ResponseCode.SUCCESS,queryConsumerOffsetResponseHeader,null);
+        when(responseFuture.get()).thenReturn(rpcResponse);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        queryConsumerOffsetResponseHeader.setOffset(-1L);
+        rpcResponse = new 
RpcResponse(ResponseCode.SUCCESS,queryConsumerOffsetResponseHeader,null);
+        when(responseFuture.get()).thenReturn(rpcResponse);
+        response = consumerManageProcessor.processRequest(handlerContext, 
request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
+    }
+
+    @Test
+    public void testRewriteRequestForStaticTopic() throws RpcException, 
ExecutionException, InterruptedException {
+        UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(0);
+        requestHeader.setCommitOffset(0L);
+
+        RemotingCommand response = 
consumerManageProcessor.rewriteRequestForStaticTopic(requestHeader, 
mappingContext);
+        assertThat(response).isNotNull();
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.NOT_LEADER_FOR_QUEUE);
+
+        List<LogicQueueMappingItem> items = new ArrayList<>();
+        LogicQueueMappingItem item = createLogicQueueMappingItem("BrokerC", 0, 
0L, 0L);
+        items.add(item);
+        when(mappingContext.getMappingItemList()).thenReturn(items);
+        when(mappingContext.isLeader()).thenReturn(true);
+        RpcResponse rpcResponse = new RpcResponse(ResponseCode.SUCCESS,new 
UpdateConsumerOffsetResponseHeader(),null);
+        when(responseFuture.get()).thenReturn(rpcResponse);
+        response = 
consumerManageProcessor.rewriteRequestForStaticTopic(requestHeader, 
mappingContext);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    public RemotingCommand buildQueryConsumerOffsetRequest(String group, 
String topic, int queueId,boolean setZeroIfNotFound) {
+        QueryConsumerOffsetRequestHeader requestHeader = new 
QueryConsumerOffsetRequestHeader();
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setSetZeroIfNotFound(setZeroIfNotFound);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+
+    public LogicQueueMappingItem createLogicQueueMappingItem(String 
brokerName, int queueId, long startOffset, long logicOffset) {
+        LogicQueueMappingItem item = new LogicQueueMappingItem();
+        item.setBname(brokerName);
+        item.setQueueId(queueId);
+        item.setStartOffset(startOffset);
+        item.setLogicOffset(logicOffset);
+        return item;
+    }
+
     private RemotingCommand buildUpdateConsumerOffsetRequest(String group, 
String topic, int queueId, long offset) {
         UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
         requestHeader.setConsumerGroup(group);

Reply via email to