This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1a15729ca9 [ISSUE #8601]When isPopShouldStop hit,unlock 
queueLockManager (#8602)
1a15729ca9 is described below

commit 1a15729ca962d76ffe044f6332ec711b1d7546bc
Author: Lei Zhiyuan <leizhiy...@gmail.com>
AuthorDate: Fri Aug 30 13:43:01 2024 +0800

    [ISSUE #8601]When isPopShouldStop hit,unlock queueLockManager (#8602)
    
    * fix:when isPopShouldStop hit, unlock queueLockManager
    
    * fix:when isPopShouldStop hit, unlock queueLockManager
    
    * fix: limit rate of appending commit in case of DLedger commit-log
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    Co-authored-by: Zhanhui Li <lizhan...@gmail.com>
---
 .../org/apache/rocketmq/broker/processor/PopMessageProcessor.java  | 2 +-
 .../org/apache/rocketmq/store/dledger/MessageStoreTestBase.java    | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 47ef8e4013..5430fdec94 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -540,6 +540,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             return future;
         }
 
+        future.whenComplete((result, throwable) -> 
queueLockManager.unLock(lockKey));
         if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) 
{
             POP_LOGGER.warn("Too much msgs unacked, then stop poping. 
topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), 
queueId);
             restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
@@ -548,7 +549,6 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
 
         try {
-            future.whenComplete((result, throwable) -> 
queueLockManager.unLock(lockKey));
             offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
                 true, lockKey, true);
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index a21806ffcf..c4d9f0727b 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.store.dledger;
 
+import com.google.common.util.concurrent.RateLimiter;
 import io.openmessaging.storage.dledger.DLedgerConfig;
 import io.openmessaging.storage.dledger.DLedgerServer;
 import java.io.File;
@@ -122,7 +123,13 @@ public class MessageStoreTestBase extends StoreTestBase {
     }
 
     protected void doPutMessages(MessageStore messageStore, String topic, int 
queueId, int num, long beginLogicsOffset) throws UnknownHostException {
+        RateLimiter rateLimiter = RateLimiter.create(100);
+        MessageStoreConfig storeConfig = messageStore.getMessageStoreConfig();
+        boolean limitAppendRate = storeConfig.isEnableDLegerCommitLog();
         for (int i = 0; i < num; i++) {
+            if (limitAppendRate) {
+                rateLimiter.acquire();
+            }
             MessageExtBrokerInner msgInner = buildMessage();
             msgInner.setTopic(topic);
             msgInner.setQueueId(queueId);

Reply via email to