This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9202de34c3 [ISSUE #8933] feat: DefaultPullConsumer add balance switch 
(#8934)
9202de34c3 is described below

commit 9202de34c30db004db26f4976f165595af1b8bd3
Author: Humkum <1109939...@qq.com>
AuthorDate: Wed Nov 20 14:58:30 2024 +0800

    [ISSUE #8933] feat: DefaultPullConsumer add balance switch (#8934)
---
 .../apache/rocketmq/client/consumer/DefaultMQPullConsumer.java | 10 ++++++++++
 .../client/impl/consumer/DefaultMQPullConsumerImpl.java        |  7 +++++++
 2 files changed, 17 insertions(+)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 7c9a65ecdb..9e7a86d9b4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -88,6 +88,8 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
 
     private int maxReconsumeTimes = 16;
 
+    private boolean enableRebalance = true;
+
     public DefaultMQPullConsumer() {
         this(MixAll.DEFAULT_CONSUMER_GROUP, null);
     }
@@ -468,4 +470,12 @@ public class DefaultMQPullConsumer extends ClientConfig 
implements MQPullConsume
     public void persist(MessageQueue mq) {
         this.getOffsetStore().persist(queueWithNamespace(mq));
     }
+
+    public boolean isEnableRebalance() {
+        return enableRebalance;
+    }
+
+    public void setEnableRebalance(boolean enableRebalance) {
+        this.enableRebalance = enableRebalance;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 9a8ea8fb4f..e05c614c6d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -381,6 +381,9 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
 
     @Override
     public void doRebalance() {
+        if (!defaultMQPullConsumer.isEnableRebalance()) {
+            return;
+        }
         if (this.rebalanceImpl != null) {
             this.rebalanceImpl.doRebalance(false);
         }
@@ -388,6 +391,10 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
 
     @Override
     public boolean tryRebalance() {
+        if (!defaultMQPullConsumer.isEnableRebalance()) {
+            return true;
+        }
+
         if (this.rebalanceImpl != null) {
             return this.rebalanceImpl.doRebalance(false);
         }

Reply via email to