This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 017b7537d2 [ISSUE #8227] Optimize DefaultMQPushConsumer construction 
method (#8228)
017b7537d2 is described below

commit 017b7537d2d02fc9a5815eac1f19b8060003fcf4
Author: yx9o <yangx_s...@163.com>
AuthorDate: Thu Jun 13 12:09:11 2024 +0800

    [ISSUE #8227] Optimize DefaultMQPushConsumer construction method (#8228)
---
 .../client/consumer/DefaultMQPushConsumer.java     | 23 ++++-----
 .../client/consumer/DefaultMQPushConsumerTest.java | 58 ++++++++++++++++------
 2 files changed, 51 insertions(+), 30 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 312f4632ca..38a412c237 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -16,10 +16,6 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -40,12 +36,17 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * In most scenarios, this is the mostly recommended class to consume messages.
@@ -328,10 +329,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      * @param rpcHook RPC hook to execute before each remoting command.
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
-        this.consumerGroup = consumerGroup;
-        this.rpcHook = rpcHook;
-        this.allocateMessageQueueStrategy = new 
AllocateMessageQueueAveragely();
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
+        this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
     }
 
 
@@ -355,10 +353,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
-        this.consumerGroup = consumerGroup;
-        this.rpcHook = rpcHook;
-        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
-        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
+        this(consumerGroup, rpcHook, allocateMessageQueueStrategy, false, 
null);
     }
 
     /**
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 3943b92289..a10fd74b34 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -16,20 +16,6 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -37,6 +23,8 @@ import 
org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -53,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.PullRequest;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -62,7 +51,6 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -71,8 +59,27 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -80,6 +87,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -206,7 +214,7 @@ public class DefaultMQPushConsumerTest {
 
     @Test
     public void testStart_OffsetShouldNotNUllAfterStart() {
-        Assert.assertNotNull(pushConsumer.getOffsetStore());
+        assertNotNull(pushConsumer.getOffsetStore());
     }
 
     @Test
@@ -388,4 +396,22 @@ public class DefaultMQPushConsumerTest {
         pullMessageService.executePullRequestImmediately(createPullRequest());
         assertThat(messageExts[0]).isNull();
     }
+
+    @Test
+    public void assertCreatePushConsumer() {
+        DefaultMQPushConsumer pushConsumer1 = new 
DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class));
+        assertNotNull(pushConsumer1);
+        assertEquals(consumerGroup, pushConsumer1.getConsumerGroup());
+        assertTrue(pushConsumer1.getAllocateMessageQueueStrategy() instanceof 
AllocateMessageQueueAveragely);
+        assertNotNull(pushConsumer1.defaultMQPushConsumerImpl);
+        assertFalse(pushConsumer1.isEnableTrace());
+        assertTrue(UtilAll.isBlank(pushConsumer1.getTraceTopic()));
+        DefaultMQPushConsumer pushConsumer2 = new 
DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class), new 
AllocateMessageQueueAveragelyByCircle());
+        assertNotNull(pushConsumer2);
+        assertEquals(consumerGroup, pushConsumer2.getConsumerGroup());
+        assertTrue(pushConsumer2.getAllocateMessageQueueStrategy() instanceof 
AllocateMessageQueueAveragelyByCircle);
+        assertNotNull(pushConsumer2.defaultMQPushConsumerImpl);
+        assertFalse(pushConsumer2.isEnableTrace());
+        assertTrue(UtilAll.isBlank(pushConsumer2.getTraceTopic()));
+    }
 }

Reply via email to