This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3d357bb3ed [ISSUE #7772] Ensuring broker protection capabilities when POP does not return ACK (#7773) 3d357bb3ed is described below commit 3d357bb3ed4dcee38cdde9b1525b15226820321a Author: Ji Juntao <juntao....@alibaba-inc.com> AuthorDate: Fri Jan 26 14:34:53 2024 +0800 [ISSUE #7772] Ensuring broker protection capabilities when POP does not return ACK (#7773) * add stop pop situation. * refactor the imports * modify the threadhold into threshold. * checkstyle --- .../broker/processor/PopMessageProcessor.java | 12 ++++++++++++ .../java/org/apache/rocketmq/common/BrokerConfig.java | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 02e266a786..105e11643f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -519,6 +519,13 @@ public class PopMessageProcessor implements NettyRequestProcessor { return future; } + if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { + POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); + restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; + future.complete(restNum); + return future; + } + try { future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), @@ -667,6 +674,11 @@ public class PopMessageProcessor implements NettyRequestProcessor { }); } + private boolean isPopShouldStop(String topic, String group, int queueId) { + return brokerController.getBrokerConfig().isEnablePopMessageThreshold() && + brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold(); + } + private long getPopOffset(String topic, String group, int queueId, int initMode, boolean init, String lockKey, boolean checkResetOffset) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 0a2c528f86..0a1bfa5d67 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -407,6 +407,9 @@ public class BrokerConfig extends BrokerIdentity { */ private boolean enableSplitRegistration = false; + private long popInflightMessageThreshold = 10000; + private boolean enablePopMessageThreshold = false; + private int splitRegistrationSize = 800; /** @@ -1799,4 +1802,20 @@ public class BrokerConfig extends BrokerIdentity { public void setTransactionMetricFlushInterval(long transactionMetricFlushInterval) { this.transactionMetricFlushInterval = transactionMetricFlushInterval; } + + public long getPopInflightMessageThreshold() { + return popInflightMessageThreshold; + } + + public void setPopInflightMessageThreshold(long popInflightMessageThreshold) { + this.popInflightMessageThreshold = popInflightMessageThreshold; + } + + public boolean isEnablePopMessageThreshold() { + return enablePopMessageThreshold; + } + + public void setEnablePopMessageThreshold(boolean enablePopMessageThreshold) { + this.enablePopMessageThreshold = enablePopMessageThreshold; + } }