This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 50651ef8ddc66e59ccc063022d253562b94a8370 Author: nowinkey <[email protected]> AuthorDate: Mon Feb 13 21:12:33 2023 +0800 Add integration test --- .../test/smoke/NormalMessageSendAndRecvIT.java | 52 +++++++++++++++++----- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java index f3b30b5af..1876cee64 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java @@ -17,18 +17,18 @@ package org.apache.rocketmq.test.smoke; -import java.time.Duration; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; +import com.google.common.collect.ImmutableList; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.base.IntegrationTestBase; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; @@ -39,6 +39,11 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import static com.google.common.truth.Truth.assertThat; @@ -112,11 +117,38 @@ public class NormalMessageSendAndRecvIT extends BaseConf { @Test public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() throws Exception { - Properties properties = new Properties(); - properties.setProperty("enableBuildConsumeQueueConcurrently", "true"); - defaultMQAdminExt.updateBrokerConfig(brokerController1.getBrokerAddr(), properties); - defaultMQAdminExt.updateBrokerConfig(brokerController2.getBrokerAddr(), properties); - defaultMQAdminExt.updateBrokerConfig(brokerController3.getBrokerAddr(), properties); + resetStoreConfigWithEnableBuildConsumeQueueConcurrently(true); + testSynSendMessage(); + + resetStoreConfigWithEnableBuildConsumeQueueConcurrently(false); + } + + void resetStoreConfigWithEnableBuildConsumeQueueConcurrently(boolean enableBuildConsumeQueueConcurrently) { + { + brokerController1.shutdown(); + MessageStoreConfig storeConfig = brokerController1.getMessageStoreConfig(); + BrokerConfig brokerConfig = brokerController1.getBrokerConfig(); + storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently); + brokerController1 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig); + } + { + brokerController2.shutdown(); + MessageStoreConfig storeConfig = brokerController2.getMessageStoreConfig(); + BrokerConfig brokerConfig = brokerController2.getBrokerConfig(); + storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently); + brokerController2 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig); + } + { + brokerController3.shutdown(); + MessageStoreConfig storeConfig = brokerController3.getMessageStoreConfig(); + BrokerConfig brokerConfig = brokerController3.getBrokerConfig(); + storeConfig.setEnableBuildConsumeQueueConcurrently(enableBuildConsumeQueueConcurrently); + brokerController3 = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig); + } + brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3); + brokerControllerMap = brokerControllerList.stream().collect( + Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity())); } + }
