This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 59e8f9b66e [ISSUE #7644] Optimize client rebalance 59e8f9b66e is described below commit 59e8f9b66ede2f02ec40a0c58fd5e5c2bd6d59e5 Author: Zhouxiang Zhan <zhouxz...@apache.org> AuthorDate: Wed Dec 27 10:42:50 2023 +0800 [ISSUE #7644] Optimize client rebalance --- .../consumer/ConsumeMessageOrderlyService.java | 4 +- .../impl/consumer/DefaultLitePullConsumerImpl.java | 8 +++ .../impl/consumer/DefaultMQPullConsumerImpl.java | 8 +++ .../impl/consumer/DefaultMQPushConsumerImpl.java | 9 +++ .../client/impl/consumer/MQConsumerInner.java | 2 + .../client/impl/consumer/ProcessQueue.java | 8 +-- .../client/impl/consumer/RebalanceImpl.java | 10 ++- .../client/impl/consumer/RebalancePushImpl.java | 72 +++++++++++----------- .../client/impl/consumer/RebalanceService.java | 17 ++++- .../client/impl/factory/MQClientInstance.java | 9 ++- 10 files changed, 97 insertions(+), 50 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 4246768d40..cab4fe5d69 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -488,7 +488,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { - this.processQueue.getConsumeLock().lock(); + this.processQueue.getConsumeLock().readLock().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); @@ -504,7 +504,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { messageQueue), e); hasException = true; } finally { - this.processQueue.getConsumeLock().unlock(); + this.processQueue.getConsumeLock().readLock().unlock(); } if (null == status diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 20ca477008..9350970a07 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -1121,6 +1121,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } + @Override + public boolean tryRebalance() { + if (this.rebalanceImpl != null) { + return this.rebalanceImpl.doRebalance(false); + } + return false; + } + @Override public void persistConsumerOffset() { try { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index e6d148c7f6..f5d326071d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -386,6 +386,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } } + @Override + public boolean tryRebalance() { + if (this.rebalanceImpl != null) { + return this.rebalanceImpl.doRebalance(false); + } + return false; + } + @Override public void persistConsumerOffset() { try { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 15563a4f0e..d2faed3783 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -417,6 +417,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { // removeProcessQueue will also remove offset to cancel the frozen status. DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); + DefaultMQPushConsumerImpl.this.rebalanceImpl.getmQClientFactory().rebalanceImmediately(); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { @@ -1375,6 +1376,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } + @Override + public boolean tryRebalance() { + if (!this.pause) { + return this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); + } + return false; + } + @Override public void persistConsumerOffset() { try { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java index 7e84b508b1..8fc1cc9059 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java @@ -40,6 +40,8 @@ public interface MQConsumerInner { void doRebalance(); + boolean tryRebalance(); + void persistConsumerOffset(); void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index ab94a98467..ebc208a8d8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -22,18 +22,16 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo; /** * Queue consumption snapshot @@ -48,7 +46,7 @@ public class ProcessQueue { private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); - private final Lock consumeLock = new ReentrantLock(); + private final ReadWriteLock consumeLock = new ReentrantReadWriteLock(); /** * A subset of msgTreeMap, will only be used when orderly consume */ @@ -392,7 +390,7 @@ public class ProcessQueue { this.lastLockTimestamp = lastLockTimestamp; } - public Lock getConsumeLock() { + public ReadWriteLock getConsumeLock() { return consumeLock; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 97d9460f82..53addc5f50 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -242,9 +242,15 @@ public abstract class RebalanceImpl { final String topic = entry.getKey(); try { if (!clientRebalance(topic) && tryQueryAssignment(topic)) { - balanced = this.getRebalanceResultFromBroker(topic, isOrder); + boolean result = this.getRebalanceResultFromBroker(topic, isOrder); + if (!result) { + balanced = false; + } } else { - balanced = this.rebalanceByTopic(topic, isOrder); + boolean result = this.rebalanceByTopic(topic, isOrder); + if (!result) { + balanced = false; + } } } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index f9cf429c69..f28890d306 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -91,32 +91,47 @@ public class RebalancePushImpl extends RebalanceImpl { } @Override - public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { - this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); - this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); + public boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq) { if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { - try { - if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) { - try { - return this.unlockDelay(mq, pq); - } finally { - pq.getConsumeLock().unlock(); - } - } else { - log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", - mq, - pq.getTryUnlockTimes()); - pq.incTryUnlockTimes(); + // commit offset immediately + this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); + + // remove order message queue: unlock & remove + return tryRemoveOrderMessageQueue(mq, pq); + } else { + this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); + this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); + return true; + } + } + + private boolean tryRemoveOrderMessageQueue(final MessageQueue mq, final ProcessQueue pq) { + try { + // unlock & remove when no message is consuming or UNLOCK_DELAY_TIME_MILLS timeout (Backwards compatibility) + boolean forceUnlock = pq.isDropped() && System.currentTimeMillis() > pq.getLastLockTimestamp() + UNLOCK_DELAY_TIME_MILLS; + if (forceUnlock || pq.getConsumeLock().writeLock().tryLock(500, TimeUnit.MILLISECONDS)) { + try { + RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); + RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); + + pq.setLocked(false); + RebalancePushImpl.this.unlock(mq, true); + return true; + } finally { + if (!forceUnlock) { + pq.getConsumeLock().writeLock().unlock(); + } } - } catch (Exception e) { - log.error("removeUnnecessaryMessageQueue Exception", e); + } else { + pq.incTryUnlockTimes(); } - - return false; + } catch (Exception e) { + pq.incTryUnlockTimes(); } - return true; + + return false; } @Override @@ -129,23 +144,6 @@ public class RebalancePushImpl extends RebalanceImpl { return true; } - private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) { - - if (pq.hasTempMessage()) { - log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq); - this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() { - @Override - public void run() { - log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq); - RebalancePushImpl.this.unlock(mq, true); - } - }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS); - } else { - this.unlock(mq, true); - } - return true; - } - @Override public ConsumeType consumeType() { return ConsumeType.CONSUME_PASSIVELY; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java index 56f589d519..8e586c85fe 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java @@ -25,8 +25,12 @@ public class RebalanceService extends ServiceThread { private static long waitInterval = Long.parseLong(System.getProperty( "rocketmq.client.rebalance.waitInterval", "20000")); + private static long minInterval = + Long.parseLong(System.getProperty( + "rocketmq.client.rebalance.minInterval", "1000")); private final Logger log = LoggerFactory.getLogger(RebalanceService.class); private final MQClientInstance mqClientFactory; + private long lastRebalanceTimestamp = System.currentTimeMillis(); public RebalanceService(MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; @@ -36,9 +40,18 @@ public class RebalanceService extends ServiceThread { public void run() { log.info(this.getServiceName() + " service started"); + long realWaitInterval = waitInterval; while (!this.isStopped()) { - this.waitForRunning(waitInterval); - this.mqClientFactory.doRebalance(); + this.waitForRunning(realWaitInterval); + + long interval = System.currentTimeMillis() - lastRebalanceTimestamp; + if (interval < minInterval) { + realWaitInterval = minInterval - interval; + } else { + boolean balanced = this.mqClientFactory.doRebalance(); + realWaitInterval = balanced ? waitInterval : minInterval; + lastRebalanceTimestamp = System.currentTimeMillis(); + } } log.info(this.getServiceName() + " service end"); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index ad39372d35..436782efd3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1060,17 +1060,22 @@ public class MQClientInstance { this.rebalanceService.wakeup(); } - public void doRebalance() { + public boolean doRebalance() { + boolean balanced = true; for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { - impl.doRebalance(); + if (!impl.tryRebalance()) { + balanced = false; + } } catch (Throwable e) { log.error("doRebalance exception", e); } } } + + return balanced; } public MQProducerInner selectProducer(final String group) {