This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e805671c89 [ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207) e805671c89 is described below commit e805671c89eb1e647f1bd429b7538761bf69ad27 Author: fujian-zfj <2573259...@qq.com> AuthorDate: Wed Feb 26 11:27:40 2025 +0800 [ISSUE #9206] Fix slave sync topic sub in rocksdb ha (#9207) * typo int readme[ecosystem] * fix slave sunc topic and sub in rocksdb ha mode --- .../config/v1/RocksDBSubscriptionGroupManager.java | 1 - .../rocketmq/broker/slave/SlaveSynchronize.java | 44 +++++++++++++++------- .../subscription/SubscriptionGroupManager.java | 2 +- .../rocketmq/broker/topic/TopicConfigManager.java | 2 +- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java index ff47152569..b208169e41 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java @@ -78,7 +78,6 @@ public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { return true; } - private boolean merge() { if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { log.info("subGroup json file does not exist, so skip merge"); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index bfb5c9dcd0..f75fd21610 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.broker.slave; import java.io.IOException; +import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -24,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -77,20 +80,28 @@ public class SlaveSynchronize { try { TopicConfigAndMappingSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); - if (!this.brokerController.getTopicConfigManager().getDataVersion() - .equals(topicWrapper.getDataVersion())) { + TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager(); + if (!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) { - this.brokerController.getTopicConfigManager().getDataVersion() - .assignNewOne(topicWrapper.getDataVersion()); + topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion()); ConcurrentMap<String, TopicConfig> newTopicConfigTable = topicWrapper.getTopicConfigTable(); + ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable(); + //delete - ConcurrentMap<String, TopicConfig> topicConfigTable = this.brokerController.getTopicConfigManager().getTopicConfigTable(); - topicConfigTable.entrySet().removeIf(item -> !newTopicConfigTable.containsKey(item.getKey())); + Iterator<Map.Entry<String, TopicConfig>> iterator = topicConfigTable.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, TopicConfig> entry = iterator.next(); + if (!newTopicConfigTable.containsKey(entry.getKey())) { + iterator.remove(); + } + topicConfigManager.deleteTopicConfig(entry.getKey()); + } + //update - topicConfigTable.putAll(newTopicConfigTable); + newTopicConfigTable.values().forEach(topicConfigManager::updateSingleTopicConfigWithoutPersist); - this.brokerController.getTopicConfigManager().persist(); + topicConfigManager.persist(); } if (topicWrapper.getTopicQueueMappingDetailMap() != null && !topicWrapper.getMappingDataVersion().equals(this.brokerController.getTopicQueueMappingManager().getDataVersion())) { @@ -165,19 +176,24 @@ public class SlaveSynchronize { if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() .equals(subscriptionWrapper.getDataVersion())) { - SubscriptionGroupManager subscriptionGroupManager = - this.brokerController.getSubscriptionGroupManager(); - subscriptionGroupManager.getDataVersion().assignNewOne( - subscriptionWrapper.getDataVersion()); + SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager(); + subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion()); ConcurrentMap<String, SubscriptionGroupConfig> curSubscriptionGroupTable = subscriptionGroupManager.getSubscriptionGroupTable(); ConcurrentMap<String, SubscriptionGroupConfig> newSubscriptionGroupTable = subscriptionWrapper.getSubscriptionGroupTable(); // delete - curSubscriptionGroupTable.entrySet().removeIf(e -> !newSubscriptionGroupTable.containsKey(e.getKey())); + Iterator<Map.Entry<String, SubscriptionGroupConfig>> iterator = curSubscriptionGroupTable.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next(); + if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) { + iterator.remove(); + } + subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); + } // update - curSubscriptionGroupTable.putAll(newSubscriptionGroupTable); + newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::updateSubscriptionGroupConfigWithoutPersist); // persist subscriptionGroupManager.persist(); LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index f62a3e4a09..d85342e1a1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager { this.persist(); } - protected void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) { + public void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) { Map<String, String> newAttributes = request(config); Map<String, String> currentAttributes = current(config.getGroupName()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 4530c10002..b20cafc101 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -497,7 +497,7 @@ public class TopicConfigManager extends ConfigManager { } } - protected void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) { + public void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) { checkNotNull(topicConfig, "topicConfig shouldn't be null"); Map<String, String> newAttributes = request(topicConfig);