This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e805671c89 [ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207)
e805671c89 is described below

commit e805671c89eb1e647f1bd429b7538761bf69ad27
Author: fujian-zfj <2573259...@qq.com>
AuthorDate: Wed Feb 26 11:27:40 2025 +0800

    [ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207)
    
    * typo int readme[ecosystem]
    
    * fix slave sunc topic and sub in rocksdb ha mode
---
 .../config/v1/RocksDBSubscriptionGroupManager.java |  1 -
 .../rocketmq/broker/slave/SlaveSynchronize.java    | 44 +++++++++++++++-------
 .../subscription/SubscriptionGroupManager.java     |  2 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  |  2 +-
 4 files changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index ff47152569..b208169e41 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -78,7 +78,6 @@ public class RocksDBSubscriptionGroupManager extends 
SubscriptionGroupManager {
         return true;
     }
 
-
     private boolean merge() {
         if (!UtilAll.isPathExists(this.configFilePath()) && 
!UtilAll.isPathExists(this.configFilePath() + ".bak")) {
             log.info("subGroup json file does not exist, so skip merge");
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index bfb5c9dcd0..f75fd21610 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,6 +17,8 @@
 package org.apache.rocketmq.broker.slave;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -24,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -77,20 +80,28 @@ public class SlaveSynchronize {
             try {
                 TopicConfigAndMappingSerializeWrapper topicWrapper =
                         
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
-                if 
(!this.brokerController.getTopicConfigManager().getDataVersion()
-                        .equals(topicWrapper.getDataVersion())) {
+                TopicConfigManager topicConfigManager = 
this.brokerController.getTopicConfigManager();
+                if 
(!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) {
 
-                    
this.brokerController.getTopicConfigManager().getDataVersion()
-                            .assignNewOne(topicWrapper.getDataVersion());
+                    
topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion());
 
                     ConcurrentMap<String, TopicConfig> newTopicConfigTable = 
topicWrapper.getTopicConfigTable();
+                    ConcurrentMap<String, TopicConfig> topicConfigTable = 
topicConfigManager.getTopicConfigTable();
+
                     //delete
-                    ConcurrentMap<String, TopicConfig> topicConfigTable = 
this.brokerController.getTopicConfigManager().getTopicConfigTable();
-                    topicConfigTable.entrySet().removeIf(item -> 
!newTopicConfigTable.containsKey(item.getKey()));
+                    Iterator<Map.Entry<String, TopicConfig>> iterator = 
topicConfigTable.entrySet().iterator();
+                    while (iterator.hasNext()) {
+                        Map.Entry<String, TopicConfig> entry = iterator.next();
+                        if (!newTopicConfigTable.containsKey(entry.getKey())) {
+                            iterator.remove();
+                        }
+                        topicConfigManager.deleteTopicConfig(entry.getKey());
+                    }
+
                     //update
-                    topicConfigTable.putAll(newTopicConfigTable);
+                    
newTopicConfigTable.values().forEach(topicConfigManager::updateSingleTopicConfigWithoutPersist);
 
-                    this.brokerController.getTopicConfigManager().persist();
+                    topicConfigManager.persist();
                 }
                 if (topicWrapper.getTopicQueueMappingDetailMap() != null
                         && 
!topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion()))
 {
@@ -165,19 +176,24 @@ public class SlaveSynchronize {
 
                 if 
(!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                         .equals(subscriptionWrapper.getDataVersion())) {
-                    SubscriptionGroupManager subscriptionGroupManager =
-                            
this.brokerController.getSubscriptionGroupManager();
-                    subscriptionGroupManager.getDataVersion().assignNewOne(
-                            subscriptionWrapper.getDataVersion());
+                    SubscriptionGroupManager subscriptionGroupManager = 
this.brokerController.getSubscriptionGroupManager();
+                    
subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());
 
                     ConcurrentMap<String, SubscriptionGroupConfig> 
curSubscriptionGroupTable =
                             
subscriptionGroupManager.getSubscriptionGroupTable();
                     ConcurrentMap<String, SubscriptionGroupConfig> 
newSubscriptionGroupTable =
                             subscriptionWrapper.getSubscriptionGroupTable();
                     // delete
-                    curSubscriptionGroupTable.entrySet().removeIf(e -> 
!newSubscriptionGroupTable.containsKey(e.getKey()));
+                    Iterator<Map.Entry<String, SubscriptionGroupConfig>> 
iterator = curSubscriptionGroupTable.entrySet().iterator();
+                    while (iterator.hasNext()) {
+                        Map.Entry<String, SubscriptionGroupConfig> configEntry 
= iterator.next();
+                        if 
(!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
+                            iterator.remove();
+                        }
+                        
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
+                    }
                     // update
-                    
curSubscriptionGroupTable.putAll(newSubscriptionGroupTable);
+                    
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::updateSubscriptionGroupConfigWithoutPersist);
                     // persist
                     subscriptionGroupManager.persist();
                     LOGGER.info("Update slave Subscription Group from master, 
{}", masterAddrBak);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index f62a3e4a09..d85342e1a1 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         this.persist();
     }
 
-    protected void 
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
+    public void 
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
         Map<String, String> newAttributes = request(config);
         Map<String, String> currentAttributes = current(config.getGroupName());
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 4530c10002..b20cafc101 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -497,7 +497,7 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-    protected void updateSingleTopicConfigWithoutPersist(final TopicConfig 
topicConfig) {
+    public void updateSingleTopicConfigWithoutPersist(final TopicConfig 
topicConfig) {
         checkNotNull(topicConfig, "topicConfig shouldn't be null");
 
         Map<String, String> newAttributes = request(topicConfig);

Reply via email to