This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 18c30cbab6 [ISSUE #8592] Not notify long polling request when pop orderly consume blocked (#8593) 18c30cbab6 is described below commit 18c30cbab653a2e5c383aace271e1972204b5291 Author: lizhimins <707364...@qq.com> AuthorDate: Thu Aug 29 15:05:52 2024 +0800 [ISSUE #8592] Not notify long polling request when pop orderly consume blocked (#8593) --- .../broker/processor/PopMessageProcessor.java | 24 ++++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 6073023722..47ef8e4013 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -551,18 +551,24 @@ public class PopMessageProcessor implements NettyRequestProcessor { future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); - if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic, - requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { - future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum); - return future; - } + // Current requests would calculate the total number of messages + // waiting to be filtered for new message arrival notifications in + // the long-polling service, need disregarding the backlog in order + // consumption scenario. If rest message num including the blocked + // queue accumulation would lead to frequent unnecessary wake-ups + // of long-polling requests, resulting unnecessary CPU usage. + // When client ack message, long-polling request would be notifications + // by AckMessageProcessor.ackOrderly() and message will not be delayed. if (isOrder) { + if (brokerController.getConsumerOrderInfoManager().checkBlock( + attemptId, topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { + // should not add accumulation(max offset - consumer offset) here + future.complete(restNum); + return future; + } this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum( - topic, - requestHeader.getConsumerGroup(), - queueId - ); + topic, requestHeader.getConsumerGroup(), queueId); } if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {