This is an automated email from the ASF dual-hosted git repository. yuzhou pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 017b7537d2 [ISSUE #8227] Optimize DefaultMQPushConsumer construction method (#8228) 017b7537d2 is described below commit 017b7537d2d02fc9a5815eac1f19b8060003fcf4 Author: yx9o <yangx_s...@163.com> AuthorDate: Thu Jun 13 12:09:11 2024 +0800 [ISSUE #8227] Optimize DefaultMQPushConsumer construction method (#8228) --- .../client/consumer/DefaultMQPushConsumer.java | 23 ++++----- .../client/consumer/DefaultMQPushConsumerTest.java | 58 ++++++++++++++++------ 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 312f4632ca..38a412c237 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -16,10 +16,6 @@ */ package org.apache.rocketmq.client.consumer; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.consumer.listener.MessageListener; @@ -40,12 +36,17 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * In most scenarios, this is the mostly recommended class to consume messages. @@ -328,10 +329,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param rpcHook RPC hook to execute before each remoting command. */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) { - this.consumerGroup = consumerGroup; - this.rpcHook = rpcHook; - this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); - defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely()); } @@ -355,10 +353,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this.consumerGroup = consumerGroup; - this.rpcHook = rpcHook; - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + this(consumerGroup, rpcHook, allocateMessageQueueStrategy, false, null); } /** diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 3943b92289..a10fd74b34 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -16,20 +16,6 @@ */ package org.apache.rocketmq.client.consumer; -import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -37,6 +23,8 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.CommunicationMode; @@ -53,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -62,7 +51,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,8 +59,27 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; @@ -80,6 +87,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -206,7 +214,7 @@ public class DefaultMQPushConsumerTest { @Test public void testStart_OffsetShouldNotNUllAfterStart() { - Assert.assertNotNull(pushConsumer.getOffsetStore()); + assertNotNull(pushConsumer.getOffsetStore()); } @Test @@ -388,4 +396,22 @@ public class DefaultMQPushConsumerTest { pullMessageService.executePullRequestImmediately(createPullRequest()); assertThat(messageExts[0]).isNull(); } + + @Test + public void assertCreatePushConsumer() { + DefaultMQPushConsumer pushConsumer1 = new DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class)); + assertNotNull(pushConsumer1); + assertEquals(consumerGroup, pushConsumer1.getConsumerGroup()); + assertTrue(pushConsumer1.getAllocateMessageQueueStrategy() instanceof AllocateMessageQueueAveragely); + assertNotNull(pushConsumer1.defaultMQPushConsumerImpl); + assertFalse(pushConsumer1.isEnableTrace()); + assertTrue(UtilAll.isBlank(pushConsumer1.getTraceTopic())); + DefaultMQPushConsumer pushConsumer2 = new DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class), new AllocateMessageQueueAveragelyByCircle()); + assertNotNull(pushConsumer2); + assertEquals(consumerGroup, pushConsumer2.getConsumerGroup()); + assertTrue(pushConsumer2.getAllocateMessageQueueStrategy() instanceof AllocateMessageQueueAveragelyByCircle); + assertNotNull(pushConsumer2.defaultMQPushConsumerImpl); + assertFalse(pushConsumer2.isEnableTrace()); + assertTrue(UtilAll.isBlank(pushConsumer2.getTraceTopic())); + } }