This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 9202de34c3 [ISSUE #8933] feat: DefaultPullConsumer add balance switch (#8934) 9202de34c3 is described below commit 9202de34c30db004db26f4976f165595af1b8bd3 Author: Humkum <1109939...@qq.com> AuthorDate: Wed Nov 20 14:58:30 2024 +0800 [ISSUE #8933] feat: DefaultPullConsumer add balance switch (#8934) --- .../apache/rocketmq/client/consumer/DefaultMQPullConsumer.java | 10 ++++++++++ .../client/impl/consumer/DefaultMQPullConsumerImpl.java | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 7c9a65ecdb..9e7a86d9b4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -88,6 +88,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume private int maxReconsumeTimes = 16; + private boolean enableRebalance = true; + public DefaultMQPullConsumer() { this(MixAll.DEFAULT_CONSUMER_GROUP, null); } @@ -468,4 +470,12 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume public void persist(MessageQueue mq) { this.getOffsetStore().persist(queueWithNamespace(mq)); } + + public boolean isEnableRebalance() { + return enableRebalance; + } + + public void setEnableRebalance(boolean enableRebalance) { + this.enableRebalance = enableRebalance; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 9a8ea8fb4f..e05c614c6d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -381,6 +381,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { @Override public void doRebalance() { + if (!defaultMQPullConsumer.isEnableRebalance()) { + return; + } if (this.rebalanceImpl != null) { this.rebalanceImpl.doRebalance(false); } @@ -388,6 +391,10 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { @Override public boolean tryRebalance() { + if (!defaultMQPullConsumer.isEnableRebalance()) { + return true; + } + if (this.rebalanceImpl != null) { return this.rebalanceImpl.doRebalance(false); }