This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new acd56fc510 [ISSUE #7958] Fix proxy always return the first broker in findOneBroker (#7960) acd56fc510 is described below commit acd56fc5105cbed827d4094d87471ea9f6801634 Author: Lei Zhiyuan <leizhiy...@gmail.com> AuthorDate: Sat Apr 6 16:31:51 2024 +0800 [ISSUE #7958] Fix proxy always return the first broker in findOneBroker (#7960) --- .../service/metadata/ClusterMetadataService.java | 8 ++++- .../metadata/ClusterMetadataServiceTest.java | 37 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java index 226adeb6ec..70ce1d3480 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java @@ -19,7 +19,9 @@ package org.apache.rocketmq.proxy.service.metadata; import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; +import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -70,6 +72,8 @@ public class ClusterMetadataService extends AbstractStartAndShutdown implements protected final static Acl EMPTY_ACL = new Acl(); + protected final Random random = new Random(); + public ClusterMetadataService(TopicRouteService topicRouteService, MQClientAPIFactory mqClientAPIFactory) { this.topicRouteService = topicRouteService; @@ -274,7 +278,9 @@ public class ClusterMetadataService extends AbstractStartAndShutdown implements protected Optional<BrokerData> findOneBroker(String topic) throws Exception { try { - return topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()), topic).getTopicRouteData().getBrokerDatas().stream().findAny(); + List<BrokerData> brokerDatas = topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()), topic).getTopicRouteData().getBrokerDatas(); + int skipNum = random.nextInt(brokerDatas.size()); + return brokerDatas.stream().skip(skipNum).findFirst(); } catch (Exception e) { if (TopicRouteHelper.isTopicNotExistError(e)) { return Optional.empty(); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java index 98bf1104f8..5894f87199 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java @@ -18,10 +18,16 @@ package org.apache.rocketmq.proxy.service.metadata; import java.util.HashMap; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.service.BaseServiceTest; +import org.apache.rocketmq.proxy.service.route.MessageQueueView; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.junit.Before; @@ -29,6 +35,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -38,6 +45,8 @@ public class ClusterMetadataServiceTest extends BaseServiceTest { private ClusterMetadataService clusterMetadataService; + protected static final String BROKER2_ADDR = "127.0.0.2:10911"; + @Before public void before() throws Throwable { super.before(); @@ -51,6 +60,16 @@ public class ClusterMetadataServiceTest extends BaseServiceTest { when(this.mqClientAPIExt.getSubscriptionGroupConfig(anyString(), eq(GROUP), anyLong())).thenReturn(new SubscriptionGroupConfig()); this.clusterMetadataService = new ClusterMetadataService(this.topicRouteService, this.mqClientAPIFactory); + + BrokerData brokerData2 = new BrokerData(); + brokerData2.setBrokerName("brokerName2"); + HashMap<Long, String> addrs = new HashMap<>(); + addrs.put(MixAll.MASTER_ID, BROKER2_ADDR); + brokerData2.setBrokerAddrs(addrs); + brokerData2.setCluster(CLUSTER_NAME); + topicRouteData.getBrokerDatas().add(brokerData2); + when(this.topicRouteService.getAllMessageQueueView(any(), eq(TOPIC))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData, null)); + } @Test @@ -70,4 +89,22 @@ public class ClusterMetadataServiceTest extends BaseServiceTest { assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(ctx, GROUP)); assertEquals(1, this.clusterMetadataService.subscriptionGroupConfigCache.asMap().size()); } + + @Test + public void findOneBroker() { + + Set<String> resultBrokerNames = new HashSet<>(); + // run 1000 times to test the random + for (int i = 0; i < 1000; i++) { + Optional<BrokerData> brokerData = null; + try { + brokerData = this.clusterMetadataService.findOneBroker(TOPIC); + resultBrokerNames.add(brokerData.get().getBrokerName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + // we should choose two brokers + assertEquals(2, resultBrokerNames.size()); + } }