This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d9d53d58cf add some tests for nameserver (#8349) d9d53d58cf is described below commit d9d53d58cf7f32143485f12d4851d5c119d0855a Author: Tan Xiang <82364837+tanxiang...@users.noreply.github.com> AuthorDate: Thu Jul 18 14:22:34 2024 +0800 add some tests for nameserver (#8349) --- .../rocketmq/namesrv/route/ZoneRouteRPCHook.java | 7 +- .../namesrv/processor/RequestProcessorTest.java | 36 +++++ .../namesrv/route/ZoneRouteRPCHookTest.java | 164 +++++++++++++++++++++ .../namesrv/routeinfo/RouteInfoManagerNewTest.java | 69 ++++++++- 4 files changed, 269 insertions(+), 7 deletions(-) diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java index 4983c88c8a..a740a0f1b4 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java @@ -56,7 +56,6 @@ public class ZoneRouteRPCHook implements RPCHook { return; } TopicRouteData topicRouteData = RemotingSerializable.decode(response.getBody(), TopicRouteData.class); - response.setBody(filterByZoneName(topicRouteData, zoneName).encode()); } @@ -64,6 +63,9 @@ public class ZoneRouteRPCHook implements RPCHook { List<BrokerData> brokerDataReserved = new ArrayList<>(); Map<String, BrokerData> brokerDataRemoved = new HashMap<>(); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + if (brokerData.getBrokerAddrs() == null) { + continue; + } //master down, consume from slave. break nearby route rule. if (brokerData.getBrokerAddrs().get(MixAll.MASTER_ID) == null || StringUtils.equalsIgnoreCase(brokerData.getZoneName(), zoneName)) { @@ -85,9 +87,6 @@ public class ZoneRouteRPCHook implements RPCHook { if (topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { for (Entry<String, BrokerData> entry : brokerDataRemoved.entrySet()) { BrokerData brokerData = entry.getValue(); - if (brokerData.getBrokerAddrs() == null) { - continue; - } brokerData.getBrokerAddrs().values() .forEach(brokerAddr -> topicRouteData.getFilterServerTable().remove(brokerAddr)); } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java index 2b2cf62949..831558a0f6 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java @@ -448,6 +448,42 @@ public class RequestProcessorTest { assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testQueryDataVersion()throws RemotingCommandException { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(null); + RemotingCommand request = getRemotingCommand(RequestCode.QUERY_DATA_VERSION); + RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request); + assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testGetBrokerMemberBroker() throws RemotingCommandException { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(null); + RemotingCommand request = getRemotingCommand(RequestCode.GET_BROKER_MEMBER_GROUP); + RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request); + assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testBrokerHeartBeat() throws RemotingCommandException { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(null); + RemotingCommand request = getRemotingCommand(RequestCode.BROKER_HEARTBEAT); + RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request); + assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testAddWritePermOfBroker() throws RemotingCommandException { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(null); + RemotingCommand request = getRemotingCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER); + RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request); + assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + @Test public void testWipeWritePermOfBroker() throws RemotingCommandException { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java new file mode 100644 index 0000000000..1bf4a6c677 --- /dev/null +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.namesrv.route; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + + +public class ZoneRouteRPCHookTest { + + private ZoneRouteRPCHook zoneRouteRPCHook; + + @Before + public void setup() { + zoneRouteRPCHook = new ZoneRouteRPCHook(); + } + + @Test + public void testDoAfterResponseWithNoZoneMode() { + RemotingCommand request1 = RemotingCommand.createRequestCommand(106,null); + zoneRouteRPCHook.doAfterResponse("", request1, null); + + HashMap<String, String> extFields = new HashMap<>(); + extFields.put(MixAll.ZONE_MODE, "false"); + RemotingCommand request = RemotingCommand.createRequestCommand(105,null); + request.setExtFields(extFields); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setBody(RemotingSerializable.encode(createSampleTopicRouteData())); + zoneRouteRPCHook.doAfterResponse("", request, response); + } + + @Test + public void testDoAfterResponseWithNoZoneName() { + HashMap<String, String> extFields = new HashMap<>(); + extFields.put(MixAll.ZONE_MODE, "true"); + RemotingCommand request = RemotingCommand.createRequestCommand(105,null); + request.setExtFields(extFields); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setBody(RemotingSerializable.encode(createSampleTopicRouteData())); + zoneRouteRPCHook.doAfterResponse("", request, response); + } + + @Test + public void testDoAfterResponseWithNoResponse() { + HashMap<String, String> extFields = new HashMap<>(); + extFields.put(MixAll.ZONE_MODE, "true"); + RemotingCommand request = RemotingCommand.createRequestCommand(105,null); + request.setExtFields(extFields); + zoneRouteRPCHook.doAfterResponse("", request, null); + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + zoneRouteRPCHook.doAfterResponse("", request, response); + + response.setBody(RemotingSerializable.encode(createSampleTopicRouteData())); + response.setCode(ResponseCode.NO_PERMISSION); + zoneRouteRPCHook.doAfterResponse("", request, response); + } + + + @Test + public void testDoAfterResponseWithValidZoneFiltering() throws Exception { + HashMap<String, String> extFields = new HashMap<>(); + extFields.put(MixAll.ZONE_MODE, "true"); + extFields.put(MixAll.ZONE_NAME,"zone1"); + RemotingCommand request = RemotingCommand.createRequestCommand(105,null); + request.setExtFields(extFields); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + TopicRouteData topicRouteData = createSampleTopicRouteData(); + response.setBody(RemotingSerializable.encode(topicRouteData)); + zoneRouteRPCHook.doAfterResponse("", request, response); + + HashMap<Long,String> brokeraddrs = new HashMap<>(); + brokeraddrs.put(MixAll.MASTER_ID,"127.0.0.1:10911"); + topicRouteData.getBrokerDatas().get(0).setBrokerAddrs(brokeraddrs); + response.setBody(RemotingSerializable.encode(topicRouteData)); + zoneRouteRPCHook.doAfterResponse("", request, response); + + topicRouteData.getQueueDatas().add(createQueueData("BrokerB")); + HashMap<Long,String> brokeraddrsB = new HashMap<>(); + brokeraddrsB.put(MixAll.MASTER_ID,"127.0.0.1:10912"); + BrokerData brokerData1 = createBrokerData("BrokerB","zone2",brokeraddrsB); + BrokerData brokerData2 = createBrokerData("BrokerC","zone1",null); + topicRouteData.getBrokerDatas().add(brokerData1); + topicRouteData.getBrokerDatas().add(brokerData2); + response.setBody(RemotingSerializable.encode(topicRouteData)); + zoneRouteRPCHook.doAfterResponse("", request, response); + + topicRouteData.getFilterServerTable().put("127.0.0.1:10911",new ArrayList<>()); + response.setBody(RemotingSerializable.encode(topicRouteData)); + zoneRouteRPCHook.doAfterResponse("", request, response); + Assert.assertEquals(1,RemotingSerializable + .decode(response.getBody(), TopicRouteData.class) + .getFilterServerTable() + .size()); + + topicRouteData.getFilterServerTable().put("127.0.0.1:10912",new ArrayList<>()); + response.setBody(RemotingSerializable.encode(topicRouteData)); + zoneRouteRPCHook.doAfterResponse("", request, response); + Assert.assertEquals(1,RemotingSerializable + .decode(response.getBody(), TopicRouteData.class) + .getFilterServerTable() + .size()); + } + + private TopicRouteData createSampleTopicRouteData() { + TopicRouteData topicRouteData = new TopicRouteData(); + List<BrokerData> brokerDatas = new ArrayList<>(); + BrokerData brokerData = createBrokerData("BrokerA","zone1",new HashMap<>()); + List<QueueData> queueDatas = new ArrayList<>(); + QueueData queueData = createQueueData("BrokerA"); + queueDatas.add(queueData); + brokerDatas.add(brokerData); + topicRouteData.setBrokerDatas(brokerDatas); + topicRouteData.setQueueDatas(queueDatas); + return topicRouteData; + } + + private BrokerData createBrokerData(String brokerName,String zoneName,HashMap<Long,String> brokerAddrs) { + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName(brokerName); + brokerData.setZoneName(zoneName); + brokerData.setBrokerAddrs(brokerAddrs); + return brokerData; + } + + private QueueData createQueueData(String brokerName) { + QueueData queueData = new QueueData(); + queueData.setBrokerName(brokerName); + queueData.setReadQueueNums(8); + queueData.setWriteQueueNums(8); + queueData.setPerm(6); + return queueData; + } +} diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java index b52cf50740..5e58cfc124 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java @@ -416,9 +416,12 @@ public class RouteInfoManagerNewTest { } @Test - public void scanNotActiveBroker() { + public void scanNotActiveBroker() throws InterruptedException { registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic"); routeInfoManager.scanNotActiveBroker(); + registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo.defaultBroker(),"TestTopic"); + Thread.sleep(30000); + routeInfoManager.scanNotActiveBroker(); } @Test @@ -589,6 +592,16 @@ public class RouteInfoManagerNewTest { assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); } + @Test + public void onChannelDestroyByBrokerInfo() { + registerBroker(BrokerBasicInfo.defaultBroker(), mock(Channel.class), null, "TestTopic", "TestTopic1"); + BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(DEFAULT_CLUSTER, DEFAULT_ADDR); + routeInfoManager.onChannelDestroy(brokerAddrInfo); + await().atMost(Duration.ofSeconds(5)).until(() -> routeInfoManager.blockedUnRegisterRequests() == 0); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); + assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull(); + } + @Test public void switchBrokerRole_ChannelDestroy() { final BrokerBasicInfo masterBroker = BrokerBasicInfo.defaultBroker().enableActingMaster(false); @@ -728,6 +741,23 @@ public class RouteInfoManagerNewTest { return registerBroker(brokerInfo, mock(Channel.class), topicConfigConcurrentHashMap, topics); } + private RegisterBrokerResult registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo brokerInfo, String... topics) { + ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>(); + TopicConfig baseTopic = new TopicConfig("baseTopic"); + topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic); + for (final String topic : topics) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setWriteQueueNums(8); + topicConfig.setTopicName(topic); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(8); + topicConfig.setOrder(false); + topicConfigConcurrentHashMap.put(topic, topicConfig); + } + + return registerBrokerWithExpiredTime(brokerInfo, mock(Channel.class), topicConfigConcurrentHashMap, topics); + } + private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo brokerBasicInfo, String... topics) { ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>(); @@ -785,7 +815,7 @@ public class RouteInfoManagerNewTest { topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap); - RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker( + return routeInfoManager.registerBroker( brokerInfo.clusterName, brokerInfo.brokerAddr, brokerInfo.brokerName, @@ -795,7 +825,40 @@ public class RouteInfoManagerNewTest { null, brokerInfo.enableActingMaster, topicConfigSerializeWrapper, new ArrayList<>(), channel); - return registerBrokerResult; + } + + private RegisterBrokerResult registerBrokerWithExpiredTime(BrokerBasicInfo brokerInfo, Channel channel, + ConcurrentMap<String, TopicConfig> topicConfigConcurrentHashMap, String... topics) { + + if (topicConfigConcurrentHashMap == null) { + topicConfigConcurrentHashMap = new ConcurrentHashMap<>(); + TopicConfig baseTopic = new TopicConfig("baseTopic"); + topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic); + for (final String topic : topics) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setWriteQueueNums(8); + topicConfig.setTopicName(topic); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(8); + topicConfig.setOrder(false); + topicConfigConcurrentHashMap.put(topic, topicConfig); + } + } + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion); + topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap); + + return routeInfoManager.registerBroker( + brokerInfo.clusterName, + brokerInfo.brokerAddr, + brokerInfo.brokerName, + brokerInfo.brokerId, + brokerInfo.haAddr, + "", + 30000L, + brokerInfo.enableActingMaster, + topicConfigSerializeWrapper, new ArrayList<>(), channel); } private void registerSingleTopicWithBrokerName(String brokerName, String... topics) {