This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f6249e5b3a [ISSEU #6426] Fix slave broker SubscriptionGroupConfig and 
MessageRequestMode updating atomically (#8983)
f6249e5b3a is described below

commit f6249e5b3a171be1cd0051732be77fd55ee4eb97
Author: Aurora Twinkle <foreverlove...@gmail.com>
AuthorDate: Mon Jan 13 14:45:20 2025 +0800

    [ISSEU #6426] Fix slave broker SubscriptionGroupConfig and 
MessageRequestMode updating atomically (#8983)
    
    * fix[slave]:Make SubscriptionGroupConfig and MessageRequestMode updating 
atomically
    
    * add unit test
    
    * fix ut
    
    ---------
    
    Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com>
---
 .../rocketmq/broker/slave/SlaveSynchronize.java    |  30 ++++-
 .../broker/slave/SlaveSynchronizeAtomicTest.java   | 141 +++++++++++++++++++++
 2 files changed, 164 insertions(+), 7 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index aa77b773ee..bfb5c9dcd0 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.slave;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
@@ -30,8 +31,10 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import 
org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
 import 
org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
+import 
org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
 import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMetrics;
@@ -166,9 +169,16 @@ public class SlaveSynchronize {
                             
this.brokerController.getSubscriptionGroupManager();
                     subscriptionGroupManager.getDataVersion().assignNewOne(
                             subscriptionWrapper.getDataVersion());
-                    
subscriptionGroupManager.getSubscriptionGroupTable().clear();
-                    
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
-                            subscriptionWrapper.getSubscriptionGroupTable());
+
+                    ConcurrentMap<String, SubscriptionGroupConfig> 
curSubscriptionGroupTable =
+                            
subscriptionGroupManager.getSubscriptionGroupTable();
+                    ConcurrentMap<String, SubscriptionGroupConfig> 
newSubscriptionGroupTable =
+                            subscriptionWrapper.getSubscriptionGroupTable();
+                    // delete
+                    curSubscriptionGroupTable.entrySet().removeIf(e -> 
!newSubscriptionGroupTable.containsKey(e.getKey()));
+                    // update
+                    
curSubscriptionGroupTable.putAll(newSubscriptionGroupTable);
+                    // persist
                     subscriptionGroupManager.persist();
                     LOGGER.info("Update slave Subscription Group from master, 
{}", masterAddrBak);
                 }
@@ -187,10 +197,16 @@ public class SlaveSynchronize {
 
                 MessageRequestModeManager messageRequestModeManager =
                         
this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager();
-                messageRequestModeManager.getMessageRequestModeMap().clear();
-                messageRequestModeManager.getMessageRequestModeMap().putAll(
-                        
messageRequestModeSerializeWrapper.getMessageRequestModeMap()
-                );
+                ConcurrentHashMap<String, ConcurrentHashMap<String, 
SetMessageRequestModeRequestBody>> curMessageRequestModeMap =
+                        messageRequestModeManager.getMessageRequestModeMap();
+                ConcurrentHashMap<String, ConcurrentHashMap<String, 
SetMessageRequestModeRequestBody>> newMessageRequestModeMap =
+                        
messageRequestModeSerializeWrapper.getMessageRequestModeMap();
+
+                // delete
+                curMessageRequestModeMap.entrySet().removeIf(e -> 
!newMessageRequestModeMap.containsKey(e.getKey()));
+                // update
+                curMessageRequestModeMap.putAll(newMessageRequestModeMap);
+                // persist
                 messageRequestModeManager.persist();
                 LOGGER.info("Update slave Message Request Mode from master, 
{}", masterAddrBak);
             } catch (Exception e) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
new file mode 100644
index 0000000000..75db22e7e7
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.slave;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import 
org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SlaveSynchronizeAtomicTest {
+    @Spy
+    private BrokerController brokerController =
+            new BrokerController(new BrokerConfig(), new NettyServerConfig(), 
new NettyClientConfig(),
+                    new MessageStoreConfig());
+
+    private SlaveSynchronize slaveSynchronize;
+
+    @Mock
+    private BrokerOuterAPI brokerOuterAPI;
+
+    @Mock
+    private TopicConfigManager topicConfigManager;
+
+
+    @Mock
+    private SubscriptionGroupManager subscriptionGroupManager;
+
+    @Mock
+    private QueryAssignmentProcessor queryAssignmentProcessor;
+
+    @Mock
+    private MessageRequestModeManager messageRequestModeManager;
+
+
+    private static final String BROKER_ADDR = "127.0.0.1:10911";
+    private final SubscriptionGroupWrapper subscriptionGroupWrapper = 
createSubscriptionGroupWrapper();
+    private final MessageRequestModeSerializeWrapper 
requestModeSerializeWrapper = createMessageRequestModeWrapper();
+    private final DataVersion dataVersion = new DataVersion();
+
+    @Before
+    public void init() {
+        for (int i = 0; i < 100000; i++) {
+            subscriptionGroupWrapper.getSubscriptionGroupTable().put("group" + 
i, new SubscriptionGroupConfig());
+        }
+        for (int i = 0; i < 100000; i++) {
+            requestModeSerializeWrapper.getMessageRequestModeMap().put("topic" 
+ i, new ConcurrentHashMap<>());
+        }
+        when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+        
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+        
when(subscriptionGroupManager.getDataVersion()).thenReturn(dataVersion);
+        when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(
+                subscriptionGroupWrapper.getSubscriptionGroupTable());
+        slaveSynchronize = new SlaveSynchronize(brokerController);
+        slaveSynchronize.setMasterAddr(BROKER_ADDR);
+    }
+
+    private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
+        SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
+        wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
+        DataVersion dataVersion = new DataVersion();
+        dataVersion.setStateVersion(1L);
+        wrapper.setDataVersion(dataVersion);
+        return wrapper;
+    }
+
+    private MessageRequestModeSerializeWrapper 
createMessageRequestModeWrapper() {
+        MessageRequestModeSerializeWrapper wrapper = new 
MessageRequestModeSerializeWrapper();
+        wrapper.setMessageRequestModeMap(new ConcurrentHashMap<>());
+        return wrapper;
+    }
+
+    @Test
+    public void testSyncAtomically()
+            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException,
+            InterruptedException {
+        
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
+        
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            while (countDownLatch.getCount() > 0) {
+                dataVersion.nextVersion();
+                try {
+                    slaveSynchronize.syncAll();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+
+        for (int i = 0; i < 10000000; i++) {
+            
Assert.assertTrue(subscriptionGroupWrapper.getSubscriptionGroupTable()
+                    .containsKey("group" + 
ThreadLocalRandom.current().nextInt(0, 100000)));
+            
Assert.assertTrue(requestModeSerializeWrapper.getMessageRequestModeMap()
+                    .containsKey("topic" + 
ThreadLocalRandom.current().nextInt(0, 100000)));
+        }
+        countDownLatch.countDown();
+    }
+}

Reply via email to