This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new acd56fc510 [ISSUE #7958] Fix proxy always return the first broker in 
findOneBroker (#7960)
acd56fc510 is described below

commit acd56fc5105cbed827d4094d87471ea9f6801634
Author: Lei Zhiyuan <leizhiy...@gmail.com>
AuthorDate: Sat Apr 6 16:31:51 2024 +0800

    [ISSUE #7958] Fix proxy always return the first broker in findOneBroker 
(#7960)
---
 .../service/metadata/ClusterMetadataService.java   |  8 ++++-
 .../metadata/ClusterMetadataServiceTest.java       | 37 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index 226adeb6ec..70ce1d3480 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.proxy.service.metadata;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.LoadingCache;
+import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -70,6 +72,8 @@ public class ClusterMetadataService extends 
AbstractStartAndShutdown implements
 
     protected final static Acl EMPTY_ACL = new Acl();
 
+    protected final Random random = new Random();
+
 
     public ClusterMetadataService(TopicRouteService topicRouteService, 
MQClientAPIFactory mqClientAPIFactory) {
         this.topicRouteService = topicRouteService;
@@ -274,7 +278,9 @@ public class ClusterMetadataService extends 
AbstractStartAndShutdown implements
 
     protected Optional<BrokerData> findOneBroker(String topic) throws 
Exception {
         try {
-            return 
topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
 topic).getTopicRouteData().getBrokerDatas().stream().findAny();
+            List<BrokerData> brokerDatas = 
topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
 topic).getTopicRouteData().getBrokerDatas();
+            int skipNum = random.nextInt(brokerDatas.size());
+            return brokerDatas.stream().skip(skipNum).findFirst();
         } catch (Exception e) {
             if (TopicRouteHelper.isTopicNotExistError(e)) {
                 return Optional.empty();
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
index 98bf1104f8..5894f87199 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
@@ -18,10 +18,16 @@
 package org.apache.rocketmq.proxy.service.metadata;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
+import org.apache.rocketmq.proxy.service.route.MessageQueueView;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.junit.Before;
@@ -29,6 +35,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -38,6 +45,8 @@ public class ClusterMetadataServiceTest extends 
BaseServiceTest {
 
     private ClusterMetadataService clusterMetadataService;
 
+    protected static final String BROKER2_ADDR = "127.0.0.2:10911";
+
     @Before
     public void before() throws Throwable {
         super.before();
@@ -51,6 +60,16 @@ public class ClusterMetadataServiceTest extends 
BaseServiceTest {
         when(this.mqClientAPIExt.getSubscriptionGroupConfig(anyString(), 
eq(GROUP), anyLong())).thenReturn(new SubscriptionGroupConfig());
 
         this.clusterMetadataService = new 
ClusterMetadataService(this.topicRouteService, this.mqClientAPIFactory);
+
+        BrokerData brokerData2 = new BrokerData();
+        brokerData2.setBrokerName("brokerName2");
+        HashMap<Long, String> addrs = new HashMap<>();
+        addrs.put(MixAll.MASTER_ID, BROKER2_ADDR);
+        brokerData2.setBrokerAddrs(addrs);
+        brokerData2.setCluster(CLUSTER_NAME);
+        topicRouteData.getBrokerDatas().add(brokerData2);
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(TOPIC))).thenReturn(new MessageQueueView(CLUSTER_NAME, topicRouteData, 
null));
+
     }
 
     @Test
@@ -70,4 +89,22 @@ public class ClusterMetadataServiceTest extends 
BaseServiceTest {
         
assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(ctx, 
GROUP));
         assertEquals(1, 
this.clusterMetadataService.subscriptionGroupConfigCache.asMap().size());
     }
+
+    @Test
+    public void findOneBroker() {
+
+        Set<String> resultBrokerNames = new HashSet<>();
+        // run 1000 times to test the random
+        for (int i = 0; i < 1000; i++) {
+            Optional<BrokerData> brokerData = null;
+            try {
+                brokerData = this.clusterMetadataService.findOneBroker(TOPIC);
+                resultBrokerNames.add(brokerData.get().getBrokerName());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        // we should choose two brokers
+        assertEquals(2, resultBrokerNames.size());
+    }
 }

Reply via email to