This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 18c30cbab6 [ISSUE #8592] Not notify long polling request when pop 
orderly consume blocked (#8593)
18c30cbab6 is described below

commit 18c30cbab653a2e5c383aace271e1972204b5291
Author: lizhimins <707364...@qq.com>
AuthorDate: Thu Aug 29 15:05:52 2024 +0800

    [ISSUE #8592] Not notify long polling request when pop orderly consume 
blocked (#8593)
---
 .../broker/processor/PopMessageProcessor.java      | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6073023722..47ef8e4013 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -551,18 +551,24 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             future.whenComplete((result, throwable) -> 
queueLockManager.unLock(lockKey));
             offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
                 true, lockKey, true);
-            if (isOrder && 
brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
-                requestHeader.getConsumerGroup(), queueId, 
requestHeader.getInvisibleTime())) {
-                
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
 queueId) - offset + restNum);
-                return future;
-            }
 
+            // Current requests would calculate the total number of messages
+            // waiting to be filtered for new message arrival notifications in
+            // the long-polling service, need disregarding the backlog in order
+            // consumption scenario. If rest message num including the blocked
+            // queue accumulation would lead to frequent unnecessary wake-ups
+            // of long-polling requests, resulting unnecessary CPU usage.
+            // When client ack message, long-polling request would be 
notifications
+            // by AckMessageProcessor.ackOrderly() and message will not be 
delayed.
             if (isOrder) {
+                if (brokerController.getConsumerOrderInfoManager().checkBlock(
+                    attemptId, topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInvisibleTime())) {
+                    // should not add accumulation(max offset - consumer 
offset) here
+                    future.complete(restNum);
+                    return future;
+                }
                 
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(
-                    topic,
-                    requestHeader.getConsumerGroup(),
-                    queueId
-                );
+                    topic, requestHeader.getConsumerGroup(), queueId);
             }
 
             if (getMessageResult.getMessageMapedList().size() >= 
requestHeader.getMaxMsgNums()) {

Reply via email to