This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 59e8f9b66e [ISSUE #7644] Optimize client rebalance
59e8f9b66e is described below

commit 59e8f9b66ede2f02ec40a0c58fd5e5c2bd6d59e5
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Wed Dec 27 10:42:50 2023 +0800

    [ISSUE #7644] Optimize client rebalance
---
 .../consumer/ConsumeMessageOrderlyService.java     |  4 +-
 .../impl/consumer/DefaultLitePullConsumerImpl.java |  8 +++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |  8 +++
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |  9 +++
 .../client/impl/consumer/MQConsumerInner.java      |  2 +
 .../client/impl/consumer/ProcessQueue.java         |  8 +--
 .../client/impl/consumer/RebalanceImpl.java        | 10 ++-
 .../client/impl/consumer/RebalancePushImpl.java    | 72 +++++++++++-----------
 .../client/impl/consumer/RebalanceService.java     | 17 ++++-
 .../client/impl/factory/MQClientInstance.java      |  9 ++-
 10 files changed, 97 insertions(+), 50 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 4246768d40..cab4fe5d69 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -488,7 +488,7 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
                             ConsumeReturnType returnType = 
ConsumeReturnType.SUCCESS;
                             boolean hasException = false;
                             try {
-                                this.processQueue.getConsumeLock().lock();
+                                
this.processQueue.getConsumeLock().readLock().lock();
                                 if (this.processQueue.isDropped()) {
                                     log.warn("consumeMessage, the message 
queue not be able to consume, because it's dropped. {}",
                                         this.messageQueue);
@@ -504,7 +504,7 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
                                     messageQueue), e);
                                 hasException = true;
                             } finally {
-                                this.processQueue.getConsumeLock().unlock();
+                                
this.processQueue.getConsumeLock().readLock().unlock();
                             }
 
                             if (null == status
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 20ca477008..9350970a07 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -1121,6 +1121,14 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    @Override
+    public boolean tryRebalance() {
+        if (this.rebalanceImpl != null) {
+            return this.rebalanceImpl.doRebalance(false);
+        }
+        return false;
+    }
+
     @Override
     public void persistConsumerOffset() {
         try {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index e6d148c7f6..f5d326071d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -386,6 +386,14 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    @Override
+    public boolean tryRebalance() {
+        if (this.rebalanceImpl != null) {
+            return this.rebalanceImpl.doRebalance(false);
+        }
+        return false;
+    }
+
     @Override
     public void persistConsumerOffset() {
         try {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 15563a4f0e..d2faed3783 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -417,6 +417,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
 
                                         // removeProcessQueue will also remove 
offset to cancel the frozen status.
                                         
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+                                        
DefaultMQPushConsumerImpl.this.rebalanceImpl.getmQClientFactory().rebalanceImmediately();
 
                                         log.warn("fix the pull request offset, 
{}", pullRequest);
                                     } catch (Throwable e) {
@@ -1375,6 +1376,14 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    @Override
+    public boolean tryRebalance() {
+        if (!this.pause) {
+            return this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
+        }
+        return false;
+    }
+
     @Override
     public void persistConsumerOffset() {
         try {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index 7e84b508b1..8fc1cc9059 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -40,6 +40,8 @@ public interface MQConsumerInner {
 
     void doRebalance();
 
+    boolean tryRebalance();
+
     void persistConsumerOffset();
 
     void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> 
info);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index ab94a98467..ebc208a8d8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -22,18 +22,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
 
 /**
  * Queue consumption snapshot
@@ -48,7 +46,7 @@ public class ProcessQueue {
     private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
     private final AtomicLong msgCount = new AtomicLong();
     private final AtomicLong msgSize = new AtomicLong();
-    private final Lock consumeLock = new ReentrantLock();
+    private final ReadWriteLock consumeLock = new ReentrantReadWriteLock();
     /**
      * A subset of msgTreeMap, will only be used when orderly consume
      */
@@ -392,7 +390,7 @@ public class ProcessQueue {
         this.lastLockTimestamp = lastLockTimestamp;
     }
 
-    public Lock getConsumeLock() {
+    public ReadWriteLock getConsumeLock() {
         return consumeLock;
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 97d9460f82..53addc5f50 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -242,9 +242,15 @@ public abstract class RebalanceImpl {
                 final String topic = entry.getKey();
                 try {
                     if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
-                        balanced = this.getRebalanceResultFromBroker(topic, 
isOrder);
+                        boolean result = 
this.getRebalanceResultFromBroker(topic, isOrder);
+                        if (!result) {
+                            balanced = false;
+                        }
                     } else {
-                        balanced = this.rebalanceByTopic(topic, isOrder);
+                        boolean result = this.rebalanceByTopic(topic, isOrder);
+                        if (!result) {
+                            balanced = false;
+                        }
                     }
                 } catch (Throwable e) {
                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index f9cf429c69..f28890d306 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -91,32 +91,47 @@ public class RebalancePushImpl extends RebalanceImpl {
     }
 
     @Override
-    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue 
pq) {
-        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
-        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+    public boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final 
ProcessQueue pq) {
         if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
             && 
MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
-            try {
-                if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
-                    try {
-                        return this.unlockDelay(mq, pq);
-                    } finally {
-                        pq.getConsumeLock().unlock();
-                    }
-                } else {
-                    log.warn("[WRONG]mq is consuming, so can not unlock it, 
{}. maybe hanged for a while, {}",
-                        mq,
-                        pq.getTryUnlockTimes());
 
-                    pq.incTryUnlockTimes();
+            // commit offset immediately
+            this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+
+            // remove order message queue: unlock & remove
+            return tryRemoveOrderMessageQueue(mq, pq);
+        } else {
+            this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+            this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+            return true;
+        }
+    }
+
+    private boolean tryRemoveOrderMessageQueue(final MessageQueue mq, final 
ProcessQueue pq) {
+        try {
+            // unlock & remove when no message is consuming or 
UNLOCK_DELAY_TIME_MILLS timeout (Backwards compatibility)
+            boolean forceUnlock = pq.isDropped() && System.currentTimeMillis() 
> pq.getLastLockTimestamp() + UNLOCK_DELAY_TIME_MILLS;
+            if (forceUnlock || pq.getConsumeLock().writeLock().tryLock(500, 
TimeUnit.MILLISECONDS)) {
+                try {
+                    
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+                    
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+
+                    pq.setLocked(false);
+                    RebalancePushImpl.this.unlock(mq, true);
+                    return true;
+                } finally {
+                    if (!forceUnlock) {
+                        pq.getConsumeLock().writeLock().unlock();
+                    }
                 }
-            } catch (Exception e) {
-                log.error("removeUnnecessaryMessageQueue Exception", e);
+            } else {
+                pq.incTryUnlockTimes();
             }
-
-            return false;
+        } catch (Exception e) {
+            pq.incTryUnlockTimes();
         }
-        return true;
+
+        return false;
     }
 
     @Override
@@ -129,23 +144,6 @@ public class RebalancePushImpl extends RebalanceImpl {
         return true;
     }
 
-    private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
-
-        if (pq.hasTempMessage()) {
-            log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
-            
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new
 Runnable() {
-                @Override
-                public void run() {
-                    log.info("[{}]unlockDelay, execute at once {}", 
mq.hashCode(), mq);
-                    RebalancePushImpl.this.unlock(mq, true);
-                }
-            }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
-        } else {
-            this.unlock(mq, true);
-        }
-        return true;
-    }
-
     @Override
     public ConsumeType consumeType() {
         return ConsumeType.CONSUME_PASSIVELY;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
index 56f589d519..8e586c85fe 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
@@ -25,8 +25,12 @@ public class RebalanceService extends ServiceThread {
     private static long waitInterval =
         Long.parseLong(System.getProperty(
             "rocketmq.client.rebalance.waitInterval", "20000"));
+    private static long minInterval =
+        Long.parseLong(System.getProperty(
+            "rocketmq.client.rebalance.minInterval", "1000"));
     private final Logger log = LoggerFactory.getLogger(RebalanceService.class);
     private final MQClientInstance mqClientFactory;
+    private long lastRebalanceTimestamp = System.currentTimeMillis();
 
     public RebalanceService(MQClientInstance mqClientFactory) {
         this.mqClientFactory = mqClientFactory;
@@ -36,9 +40,18 @@ public class RebalanceService extends ServiceThread {
     public void run() {
         log.info(this.getServiceName() + " service started");
 
+        long realWaitInterval = waitInterval;
         while (!this.isStopped()) {
-            this.waitForRunning(waitInterval);
-            this.mqClientFactory.doRebalance();
+            this.waitForRunning(realWaitInterval);
+
+            long interval = System.currentTimeMillis() - 
lastRebalanceTimestamp;
+            if (interval < minInterval) {
+                realWaitInterval = minInterval - interval;
+            } else {
+                boolean balanced = this.mqClientFactory.doRebalance();
+                realWaitInterval = balanced ? waitInterval : minInterval;
+                lastRebalanceTimestamp = System.currentTimeMillis();
+            }
         }
 
         log.info(this.getServiceName() + " service end");
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index ad39372d35..436782efd3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1060,17 +1060,22 @@ public class MQClientInstance {
         this.rebalanceService.wakeup();
     }
 
-    public void doRebalance() {
+    public boolean doRebalance() {
+        boolean balanced = true;
         for (Map.Entry<String, MQConsumerInner> entry : 
this.consumerTable.entrySet()) {
             MQConsumerInner impl = entry.getValue();
             if (impl != null) {
                 try {
-                    impl.doRebalance();
+                    if (!impl.tryRebalance()) {
+                        balanced = false;
+                    }
                 } catch (Throwable e) {
                     log.error("doRebalance exception", e);
                 }
             }
         }
+
+        return balanced;
     }
 
     public MQProducerInner selectProducer(final String group) {

Reply via email to