This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d9d53d58cf add some tests for nameserver (#8349)
d9d53d58cf is described below

commit d9d53d58cf7f32143485f12d4851d5c119d0855a
Author: Tan Xiang <82364837+tanxiang...@users.noreply.github.com>
AuthorDate: Thu Jul 18 14:22:34 2024 +0800

    add some tests for nameserver (#8349)
---
 .../rocketmq/namesrv/route/ZoneRouteRPCHook.java   |   7 +-
 .../namesrv/processor/RequestProcessorTest.java    |  36 +++++
 .../namesrv/route/ZoneRouteRPCHookTest.java        | 164 +++++++++++++++++++++
 .../namesrv/routeinfo/RouteInfoManagerNewTest.java |  69 ++++++++-
 4 files changed, 269 insertions(+), 7 deletions(-)

diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
index 4983c88c8a..a740a0f1b4 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
@@ -56,7 +56,6 @@ public class ZoneRouteRPCHook implements RPCHook {
             return;
         }
         TopicRouteData topicRouteData = 
RemotingSerializable.decode(response.getBody(), TopicRouteData.class);
-
         response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
     }
 
@@ -64,6 +63,9 @@ public class ZoneRouteRPCHook implements RPCHook {
         List<BrokerData> brokerDataReserved = new ArrayList<>();
         Map<String, BrokerData> brokerDataRemoved = new HashMap<>();
         for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+            if (brokerData.getBrokerAddrs() == null) {
+                continue;
+            }
             //master down, consume from slave. break nearby route rule.
             if (brokerData.getBrokerAddrs().get(MixAll.MASTER_ID) == null
                 || StringUtils.equalsIgnoreCase(brokerData.getZoneName(), 
zoneName)) {
@@ -85,9 +87,6 @@ public class ZoneRouteRPCHook implements RPCHook {
         if (topicRouteData.getFilterServerTable() != null && 
!topicRouteData.getFilterServerTable().isEmpty()) {
             for (Entry<String, BrokerData> entry : 
brokerDataRemoved.entrySet()) {
                 BrokerData brokerData = entry.getValue();
-                if (brokerData.getBrokerAddrs() == null) {
-                    continue;
-                }
                 brokerData.getBrokerAddrs().values()
                     .forEach(brokerAddr -> 
topicRouteData.getFilterServerTable().remove(brokerAddr));
             }
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 2b2cf62949..831558a0f6 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -448,6 +448,42 @@ public class RequestProcessorTest {
         assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
+    @Test
+    public void testQueryDataVersion()throws RemotingCommandException {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(null);
+        RemotingCommand request = 
getRemotingCommand(RequestCode.QUERY_DATA_VERSION);
+        RemotingCommand remotingCommand = 
defaultRequestProcessor.processRequest(ctx, request);
+        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetBrokerMemberBroker() throws RemotingCommandException {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(null);
+        RemotingCommand request = 
getRemotingCommand(RequestCode.GET_BROKER_MEMBER_GROUP);
+        RemotingCommand remotingCommand = 
defaultRequestProcessor.processRequest(ctx, request);
+        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testBrokerHeartBeat() throws RemotingCommandException {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(null);
+        RemotingCommand request = 
getRemotingCommand(RequestCode.BROKER_HEARTBEAT);
+        RemotingCommand remotingCommand = 
defaultRequestProcessor.processRequest(ctx, request);
+        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testAddWritePermOfBroker() throws RemotingCommandException {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.channel()).thenReturn(null);
+        RemotingCommand request = 
getRemotingCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER);
+        RemotingCommand remotingCommand = 
defaultRequestProcessor.processRequest(ctx, request);
+        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
     @Test
     public void testWipeWritePermOfBroker() throws RemotingCommandException {
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java
new file mode 100644
index 0000000000..1bf4a6c677
--- /dev/null
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.route;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ZoneRouteRPCHookTest {
+
+    private ZoneRouteRPCHook zoneRouteRPCHook;
+
+    @Before
+    public void setup() {
+        zoneRouteRPCHook = new ZoneRouteRPCHook();
+    }
+
+    @Test
+    public void testDoAfterResponseWithNoZoneMode() {
+        RemotingCommand request1 = 
RemotingCommand.createRequestCommand(106,null);
+        zoneRouteRPCHook.doAfterResponse("", request1, null);
+
+        HashMap<String, String> extFields = new HashMap<>();
+        extFields.put(MixAll.ZONE_MODE, "false");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(105,null);
+        request.setExtFields(extFields);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+    }
+
+    @Test
+    public void testDoAfterResponseWithNoZoneName() {
+        HashMap<String, String> extFields = new HashMap<>();
+        extFields.put(MixAll.ZONE_MODE, "true");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(105,null);
+        request.setExtFields(extFields);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+    }
+
+    @Test
+    public void testDoAfterResponseWithNoResponse() {
+        HashMap<String, String> extFields = new HashMap<>();
+        extFields.put(MixAll.ZONE_MODE, "true");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(105,null);
+        request.setExtFields(extFields);
+        zoneRouteRPCHook.doAfterResponse("", request, null);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+
+        
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
+        response.setCode(ResponseCode.NO_PERMISSION);
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+    }
+
+
+    @Test
+    public void testDoAfterResponseWithValidZoneFiltering() throws Exception {
+        HashMap<String, String> extFields = new HashMap<>();
+        extFields.put(MixAll.ZONE_MODE, "true");
+        extFields.put(MixAll.ZONE_NAME,"zone1");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(105,null);
+        request.setExtFields(extFields);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        TopicRouteData topicRouteData = createSampleTopicRouteData();
+        response.setBody(RemotingSerializable.encode(topicRouteData));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+
+        HashMap<Long,String> brokeraddrs = new HashMap<>();
+        brokeraddrs.put(MixAll.MASTER_ID,"127.0.0.1:10911");
+        topicRouteData.getBrokerDatas().get(0).setBrokerAddrs(brokeraddrs);
+        response.setBody(RemotingSerializable.encode(topicRouteData));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+
+        topicRouteData.getQueueDatas().add(createQueueData("BrokerB"));
+        HashMap<Long,String> brokeraddrsB = new HashMap<>();
+        brokeraddrsB.put(MixAll.MASTER_ID,"127.0.0.1:10912");
+        BrokerData brokerData1 = 
createBrokerData("BrokerB","zone2",brokeraddrsB);
+        BrokerData brokerData2 = createBrokerData("BrokerC","zone1",null);
+        topicRouteData.getBrokerDatas().add(brokerData1);
+        topicRouteData.getBrokerDatas().add(brokerData2);
+        response.setBody(RemotingSerializable.encode(topicRouteData));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+
+        topicRouteData.getFilterServerTable().put("127.0.0.1:10911",new 
ArrayList<>());
+        response.setBody(RemotingSerializable.encode(topicRouteData));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+        Assert.assertEquals(1,RemotingSerializable
+                .decode(response.getBody(), TopicRouteData.class)
+                .getFilterServerTable()
+                .size());
+
+        topicRouteData.getFilterServerTable().put("127.0.0.1:10912",new 
ArrayList<>());
+        response.setBody(RemotingSerializable.encode(topicRouteData));
+        zoneRouteRPCHook.doAfterResponse("", request, response);
+        Assert.assertEquals(1,RemotingSerializable
+                .decode(response.getBody(), TopicRouteData.class)
+                .getFilterServerTable()
+                .size());
+    }
+
+    private TopicRouteData createSampleTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        BrokerData brokerData = createBrokerData("BrokerA","zone1",new 
HashMap<>());
+        List<QueueData> queueDatas = new ArrayList<>();
+        QueueData queueData = createQueueData("BrokerA");
+        queueDatas.add(queueData);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(queueDatas);
+        return topicRouteData;
+    }
+
+    private BrokerData createBrokerData(String brokerName,String 
zoneName,HashMap<Long,String> brokerAddrs) {
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName(brokerName);
+        brokerData.setZoneName(zoneName);
+        brokerData.setBrokerAddrs(brokerAddrs);
+        return  brokerData;
+    }
+
+    private QueueData createQueueData(String brokerName) {
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName(brokerName);
+        queueData.setReadQueueNums(8);
+        queueData.setWriteQueueNums(8);
+        queueData.setPerm(6);
+        return queueData;
+    }
+}
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
index b52cf50740..5e58cfc124 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
@@ -416,9 +416,12 @@ public class RouteInfoManagerNewTest {
     }
 
     @Test
-    public void scanNotActiveBroker() {
+    public void scanNotActiveBroker() throws InterruptedException {
         registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), 
"TestTopic");
         routeInfoManager.scanNotActiveBroker();
+        
registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo.defaultBroker(),"TestTopic");
+        Thread.sleep(30000);
+        routeInfoManager.scanNotActiveBroker();
     }
 
     @Test
@@ -589,6 +592,16 @@ public class RouteInfoManagerNewTest {
         
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
     }
 
+    @Test
+    public void onChannelDestroyByBrokerInfo() {
+        registerBroker(BrokerBasicInfo.defaultBroker(), mock(Channel.class), 
null, "TestTopic", "TestTopic1");
+        BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(DEFAULT_CLUSTER, 
DEFAULT_ADDR);
+        routeInfoManager.onChannelDestroy(brokerAddrInfo);
+        await().atMost(Duration.ofSeconds(5)).until(() -> 
routeInfoManager.blockedUnRegisterRequests() == 0);
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+    }
+
     @Test
     public void switchBrokerRole_ChannelDestroy() {
         final BrokerBasicInfo masterBroker = 
BrokerBasicInfo.defaultBroker().enableActingMaster(false);
@@ -728,6 +741,23 @@ public class RouteInfoManagerNewTest {
         return registerBroker(brokerInfo, mock(Channel.class), 
topicConfigConcurrentHashMap, topics);
     }
 
+    private RegisterBrokerResult 
registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo brokerInfo, String... 
topics) {
+        ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
+        TopicConfig baseTopic = new TopicConfig("baseTopic");
+        topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
+        for (final String topic : topics) {
+            TopicConfig topicConfig = new TopicConfig();
+            topicConfig.setWriteQueueNums(8);
+            topicConfig.setTopicName(topic);
+            topicConfig.setPerm(6);
+            topicConfig.setReadQueueNums(8);
+            topicConfig.setOrder(false);
+            topicConfigConcurrentHashMap.put(topic, topicConfig);
+        }
+
+        return registerBrokerWithExpiredTime(brokerInfo, mock(Channel.class), 
topicConfigConcurrentHashMap, topics);
+    }
+
     private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo 
brokerBasicInfo, String... topics) {
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
 
@@ -785,7 +815,7 @@ public class RouteInfoManagerNewTest {
         topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion);
         
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
 
-        RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker(
+        return routeInfoManager.registerBroker(
             brokerInfo.clusterName,
             brokerInfo.brokerAddr,
             brokerInfo.brokerName,
@@ -795,7 +825,40 @@ public class RouteInfoManagerNewTest {
             null,
             brokerInfo.enableActingMaster,
             topicConfigSerializeWrapper, new ArrayList<>(), channel);
-        return registerBrokerResult;
+    }
+
+    private RegisterBrokerResult registerBrokerWithExpiredTime(BrokerBasicInfo 
brokerInfo, Channel channel,
+                                                               
ConcurrentMap<String, TopicConfig> topicConfigConcurrentHashMap, String... 
topics) {
+
+        if (topicConfigConcurrentHashMap == null) {
+            topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
+            TopicConfig baseTopic = new TopicConfig("baseTopic");
+            topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), 
baseTopic);
+            for (final String topic : topics) {
+                TopicConfig topicConfig = new TopicConfig();
+                topicConfig.setWriteQueueNums(8);
+                topicConfig.setTopicName(topic);
+                topicConfig.setPerm(6);
+                topicConfig.setReadQueueNums(8);
+                topicConfig.setOrder(false);
+                topicConfigConcurrentHashMap.put(topic, topicConfig);
+            }
+        }
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion);
+        
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
+
+        return routeInfoManager.registerBroker(
+                brokerInfo.clusterName,
+                brokerInfo.brokerAddr,
+                brokerInfo.brokerName,
+                brokerInfo.brokerId,
+                brokerInfo.haAddr,
+                "",
+                30000L,
+                brokerInfo.enableActingMaster,
+                topicConfigSerializeWrapper, new ArrayList<>(), channel);
     }
 
     private void registerSingleTopicWithBrokerName(String brokerName, 
String... topics) {

Reply via email to