This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3d357bb3ed [ISSUE #7772] Ensuring broker protection capabilities when 
POP does not return ACK (#7773)
3d357bb3ed is described below

commit 3d357bb3ed4dcee38cdde9b1525b15226820321a
Author: Ji Juntao <juntao....@alibaba-inc.com>
AuthorDate: Fri Jan 26 14:34:53 2024 +0800

    [ISSUE #7772] Ensuring broker protection capabilities when POP does not 
return ACK (#7773)
    
    * add stop pop situation.
    
    * refactor the imports
    
    * modify the threadhold into threshold.
    
    * checkstyle
---
 .../broker/processor/PopMessageProcessor.java         | 12 ++++++++++++
 .../java/org/apache/rocketmq/common/BrokerConfig.java | 19 +++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 02e266a786..105e11643f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -519,6 +519,13 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             return future;
         }
 
+        if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) 
{
+            POP_LOGGER.warn("Too much msgs unacked, then stop poping. 
topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), 
queueId);
+            restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
+            future.complete(restNum);
+            return future;
+        }
+
         try {
             future.whenComplete((result, throwable) -> 
queueLockManager.unLock(lockKey));
             offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
@@ -667,6 +674,11 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             });
     }
 
+    private boolean isPopShouldStop(String topic, String group, int queueId) {
+        return 
brokerController.getBrokerConfig().isEnablePopMessageThreshold() &&
+                
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic,
 group, queueId) > 
brokerController.getBrokerConfig().getPopInflightMessageThreshold();
+    }
+
     private long getPopOffset(String topic, String group, int queueId, int 
initMode, boolean init, String lockKey,
         boolean checkResetOffset) {
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 0a2c528f86..0a1bfa5d67 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -407,6 +407,9 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean enableSplitRegistration = false;
 
+    private long popInflightMessageThreshold = 10000;
+    private boolean enablePopMessageThreshold = false;
+
     private int splitRegistrationSize = 800;
 
     /**
@@ -1799,4 +1802,20 @@ public class BrokerConfig extends BrokerIdentity {
     public void setTransactionMetricFlushInterval(long 
transactionMetricFlushInterval) {
         this.transactionMetricFlushInterval = transactionMetricFlushInterval;
     }
+
+    public long getPopInflightMessageThreshold() {
+        return popInflightMessageThreshold;
+    }
+
+    public void setPopInflightMessageThreshold(long 
popInflightMessageThreshold) {
+        this.popInflightMessageThreshold = popInflightMessageThreshold;
+    }
+
+    public boolean isEnablePopMessageThreshold() {
+        return enablePopMessageThreshold;
+    }
+
+    public void setEnablePopMessageThreshold(boolean 
enablePopMessageThreshold) {
+        this.enablePopMessageThreshold = enablePopMessageThreshold;
+    }
 }

Reply via email to