This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new f6249e5b3a [ISSEU #6426] Fix slave broker SubscriptionGroupConfig and MessageRequestMode updating atomically (#8983) f6249e5b3a is described below commit f6249e5b3a171be1cd0051732be77fd55ee4eb97 Author: Aurora Twinkle <foreverlove...@gmail.com> AuthorDate: Mon Jan 13 14:45:20 2025 +0800 [ISSEU #6426] Fix slave broker SubscriptionGroupConfig and MessageRequestMode updating atomically (#8983) * fix[slave]:Make SubscriptionGroupConfig and MessageRequestMode updating atomically * add unit test * fix ut --------- Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com> --- .../rocketmq/broker/slave/SlaveSynchronize.java | 30 ++++- .../broker/slave/SlaveSynchronizeAtomicTest.java | 141 +++++++++++++++++++++ 2 files changed, 164 insertions(+), 7 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index aa77b773ee..bfb5c9dcd0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.slave; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; @@ -30,8 +31,10 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMetrics; @@ -166,9 +169,16 @@ public class SlaveSynchronize { this.brokerController.getSubscriptionGroupManager(); subscriptionGroupManager.getDataVersion().assignNewOne( subscriptionWrapper.getDataVersion()); - subscriptionGroupManager.getSubscriptionGroupTable().clear(); - subscriptionGroupManager.getSubscriptionGroupTable().putAll( - subscriptionWrapper.getSubscriptionGroupTable()); + + ConcurrentMap<String, SubscriptionGroupConfig> curSubscriptionGroupTable = + subscriptionGroupManager.getSubscriptionGroupTable(); + ConcurrentMap<String, SubscriptionGroupConfig> newSubscriptionGroupTable = + subscriptionWrapper.getSubscriptionGroupTable(); + // delete + curSubscriptionGroupTable.entrySet().removeIf(e -> !newSubscriptionGroupTable.containsKey(e.getKey())); + // update + curSubscriptionGroupTable.putAll(newSubscriptionGroupTable); + // persist subscriptionGroupManager.persist(); LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak); } @@ -187,10 +197,16 @@ public class SlaveSynchronize { MessageRequestModeManager messageRequestModeManager = this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager(); - messageRequestModeManager.getMessageRequestModeMap().clear(); - messageRequestModeManager.getMessageRequestModeMap().putAll( - messageRequestModeSerializeWrapper.getMessageRequestModeMap() - ); + ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> curMessageRequestModeMap = + messageRequestModeManager.getMessageRequestModeMap(); + ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> newMessageRequestModeMap = + messageRequestModeSerializeWrapper.getMessageRequestModeMap(); + + // delete + curMessageRequestModeMap.entrySet().removeIf(e -> !newMessageRequestModeMap.containsKey(e.getKey())); + // update + curMessageRequestModeMap.putAll(newMessageRequestModeMap); + // persist messageRequestModeManager.persist(); LOGGER.info("Update slave Message Request Mode from master, {}", masterAddrBak); } catch (Exception e) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java new file mode 100644 index 0000000000..75db22e7e7 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.slave; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager; +import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SlaveSynchronizeAtomicTest { + @Spy + private BrokerController brokerController = + new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), + new MessageStoreConfig()); + + private SlaveSynchronize slaveSynchronize; + + @Mock + private BrokerOuterAPI brokerOuterAPI; + + @Mock + private TopicConfigManager topicConfigManager; + + + @Mock + private SubscriptionGroupManager subscriptionGroupManager; + + @Mock + private QueryAssignmentProcessor queryAssignmentProcessor; + + @Mock + private MessageRequestModeManager messageRequestModeManager; + + + private static final String BROKER_ADDR = "127.0.0.1:10911"; + private final SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper(); + private final MessageRequestModeSerializeWrapper requestModeSerializeWrapper = createMessageRequestModeWrapper(); + private final DataVersion dataVersion = new DataVersion(); + + @Before + public void init() { + for (int i = 0; i < 100000; i++) { + subscriptionGroupWrapper.getSubscriptionGroupTable().put("group" + i, new SubscriptionGroupConfig()); + } + for (int i = 0; i < 100000; i++) { + requestModeSerializeWrapper.getMessageRequestModeMap().put("topic" + i, new ConcurrentHashMap<>()); + } + when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); + when(subscriptionGroupManager.getDataVersion()).thenReturn(dataVersion); + when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn( + subscriptionGroupWrapper.getSubscriptionGroupTable()); + slaveSynchronize = new SlaveSynchronize(brokerController); + slaveSynchronize.setMasterAddr(BROKER_ADDR); + } + + private SubscriptionGroupWrapper createSubscriptionGroupWrapper() { + SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper(); + wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>()); + DataVersion dataVersion = new DataVersion(); + dataVersion.setStateVersion(1L); + wrapper.setDataVersion(dataVersion); + return wrapper; + } + + private MessageRequestModeSerializeWrapper createMessageRequestModeWrapper() { + MessageRequestModeSerializeWrapper wrapper = new MessageRequestModeSerializeWrapper(); + wrapper.setMessageRequestModeMap(new ConcurrentHashMap<>()); + return wrapper; + } + + @Test + public void testSyncAtomically() + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, + InterruptedException { + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper); + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper); + + CountDownLatch countDownLatch = new CountDownLatch(1); + new Thread(() -> { + while (countDownLatch.getCount() > 0) { + dataVersion.nextVersion(); + try { + slaveSynchronize.syncAll(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + + for (int i = 0; i < 10000000; i++) { + Assert.assertTrue(subscriptionGroupWrapper.getSubscriptionGroupTable() + .containsKey("group" + ThreadLocalRandom.current().nextInt(0, 100000))); + Assert.assertTrue(requestModeSerializeWrapper.getMessageRequestModeMap() + .containsKey("topic" + ThreadLocalRandom.current().nextInt(0, 100000))); + } + countDownLatch.countDown(); + } +}