This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0dbd0772b9 [ISSUE #7326] Split the request to register to the 
nameserver (#7325)
0dbd0772b9 is described below

commit 0dbd0772b99f618f757d42cd64542b83e2100e4f
Author: Ziyi Tan <[email protected]>
AuthorDate: Mon Sep 11 15:48:07 2023 +0800

    [ISSUE #7326] Split the request to register to the nameserver (#7325)
    
    Signed-off-by: Ziy1-Tan <[email protected]>
---
 .../apache/rocketmq/broker/BrokerController.java   | 41 ++++++++++++----------
 .../rocketmq/broker/topic/TopicConfigManager.java  | 21 +++++++++++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 24 +++++++++++++
 .../test/route/CreateAndUpdateTopicIT.java         | 31 ++++++++++++++++
 4 files changed, 99 insertions(+), 18 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 275b64b1ab..9e49f636d2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1765,29 +1765,34 @@ public class BrokerController {
     }
 
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
boolean oneway, boolean forceRegister) {
+        ConcurrentMap<String, TopicConfig> topicConfigMap = 
this.getTopicConfigManager().getTopicConfigTable();
+        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
 
-        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new 
TopicConfigAndMappingSerializeWrapper();
-
-        
topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
-        
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
-
-        
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
-            entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), 
TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
-        ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
-
-        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
-            || 
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
-            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
-            for (TopicConfig topicConfig : 
topicConfigWrapper.getTopicConfigTable().values()) {
-                TopicConfig tmp =
+        for (TopicConfig topicConfig : topicConfigMap.values()) {
+            if 
(!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+                || 
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+                topicConfigTable.put(topicConfig.getTopicName(),
                     new TopicConfig(topicConfig.getTopicName(), 
topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
-                        topicConfig.getPerm() & 
this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
-                topicConfigTable.put(topicConfig.getTopicName(), tmp);
+                        topicConfig.getPerm() & 
getBrokerConfig().getBrokerPermission()));
+            } else {
+                topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+            }
+
+            if (this.brokerConfig.isEnableSplitRegistration()
+                && topicConfigTable.size() >= 
this.brokerConfig.getSplitRegistrationSize()) {
+                TopicConfigAndMappingSerializeWrapper topicConfigWrapper = 
this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
+                doRegisterBrokerAll(checkOrderConfig, oneway, 
topicConfigWrapper);
+                topicConfigTable.clear();
             }
-            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
-        if (forceRegister || 
needRegister(this.brokerConfig.getBrokerClusterName(),
+        Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = 
this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
+            .map(entry -> new 
AbstractMap.SimpleImmutableEntry<>(entry.getKey(), 
TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = 
this.getTopicConfigManager().
+            buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
+        if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || 
needRegister(this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 754605438d..8537929be7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.collect.ImmutableMap;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -47,7 +48,9 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
+import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -609,6 +612,24 @@ public class TopicConfigManager extends ConfigManager {
         return topicConfigSerializeWrapper;
     }
 
+    public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final 
ConcurrentMap<String, TopicConfig> topicConfigTable) {
+        return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
+    }
+
+    public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
+        final ConcurrentMap<String, TopicConfig> topicConfigTable,
+        final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
+    ) {
+        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new 
TopicConfigAndMappingSerializeWrapper();
+        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
+        
topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
+        topicConfigWrapper.setDataVersion(this.getDataVersion());
+        if 
(this.brokerController.getBrokerConfig().isEnableSplitRegistration()) {
+            this.getDataVersion().nextVersion();
+        }
+        return topicConfigWrapper;
+    }
+
     @Override
     public String encode() {
         return encode(false);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 45d26b29cb..0d248c4e17 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean enableMixedMessageType = false;
 
+    /**
+     * This flag and deleteTopicWithBrokerRegistration flag in the NameServer 
cannot be set to true at the same time,
+     * otherwise there will be a loss of routing
+     */
+    private boolean enableSplitRegistration = false;
+
+    private int splitRegistrationSize = 800;
+
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
     }
@@ -1731,4 +1739,20 @@ public class BrokerConfig extends BrokerIdentity {
     public void setEnableMixedMessageType(boolean enableMixedMessageType) {
         this.enableMixedMessageType = enableMixedMessageType;
     }
+
+    public boolean isEnableSplitRegistration() {
+        return enableSplitRegistration;
+    }
+
+    public void setEnableSplitRegistration(boolean enableSplitRegistration) {
+        this.enableSplitRegistration = enableSplitRegistration;
+    }
+
+    public int getSplitRegistrationSize() {
+        return splitRegistrationSize;
+    }
+
+    public void setSplitRegistrationSize(int splitRegistrationSize) {
+        this.splitRegistrationSize = splitRegistrationSize;
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java 
b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
index 7e3c7b871d..2370e68c0f 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.test.route;
 
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.util.MQAdminTestUtils;
@@ -111,4 +112,34 @@ public class CreateAndUpdateTopicIT extends BaseConf {
         
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
         
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
     }
+
+    @Test
+    public void testCreateOrUpdateTopic_EnableSplitRegistration() {
+        brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
+        brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
+        brokerController3.getBrokerConfig().setEnableSplitRegistration(true);
+
+        String testTopic = "test-topic-";
+
+        for (int i = 0; i < 1000; i++) {
+            TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
+            
brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
+            
brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
+            
brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
+        }
+
+        brokerController1.registerBrokerAll(false, true, true);
+        brokerController2.registerBrokerAll(false, true, true);
+        brokerController3.registerBrokerAll(false, true, true);
+
+        for (int i = 0; i < 1000; i++) {
+            TopicRouteData route = 
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
+            assertThat(route.getBrokerDatas()).hasSize(3);
+            assertThat(route.getQueueDatas()).hasSize(3);
+        }
+
+        brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
+        brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
+        brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
+    }
 }

Reply via email to