This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 1a15729ca9 [ISSUE #8601]When isPopShouldStop hit,unlock queueLockManager (#8602) 1a15729ca9 is described below commit 1a15729ca962d76ffe044f6332ec711b1d7546bc Author: Lei Zhiyuan <leizhiy...@gmail.com> AuthorDate: Fri Aug 30 13:43:01 2024 +0800 [ISSUE #8601]When isPopShouldStop hit,unlock queueLockManager (#8602) * fix:when isPopShouldStop hit, unlock queueLockManager * fix:when isPopShouldStop hit, unlock queueLockManager * fix: limit rate of appending commit in case of DLedger commit-log Signed-off-by: Zhanhui Li <lizhan...@gmail.com> --------- Signed-off-by: Zhanhui Li <lizhan...@gmail.com> Co-authored-by: Zhanhui Li <lizhan...@gmail.com> --- .../org/apache/rocketmq/broker/processor/PopMessageProcessor.java | 2 +- .../org/apache/rocketmq/store/dledger/MessageStoreTestBase.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 47ef8e4013..5430fdec94 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -540,6 +540,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { return future; } + future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; @@ -548,7 +549,6 @@ public class PopMessageProcessor implements NettyRequestProcessor { } try { - future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index a21806ffcf..c4d9f0727b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.dledger; +import com.google.common.util.concurrent.RateLimiter; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import java.io.File; @@ -122,7 +123,13 @@ public class MessageStoreTestBase extends StoreTestBase { } protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) throws UnknownHostException { + RateLimiter rateLimiter = RateLimiter.create(100); + MessageStoreConfig storeConfig = messageStore.getMessageStoreConfig(); + boolean limitAppendRate = storeConfig.isEnableDLegerCommitLog(); for (int i = 0; i < num; i++) { + if (limitAppendRate) { + rateLimiter.acquire(); + } MessageExtBrokerInner msgInner = buildMessage(); msgInner.setTopic(topic); msgInner.setQueueId(queueId);