This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 71a7a659be [ISSUE #7543] only call a single type of retry topic in pop 
(#7665)
71a7a659be is described below

commit 71a7a659bed15110d1146091bfb7a51d28ade562
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Fri Dec 15 16:09:11 2023 +0800

    [ISSUE #7543] only call a single type of retry topic in pop (#7665)
    
    * only call a single type of retry topic in pop
---
 .../broker/processor/PopMessageProcessor.java      | 85 ++++++++++++++--------
 .../broker/processor/PopMessageProcessorTest.java  |  2 +-
 2 files changed, 55 insertions(+), 32 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 58baecc05a..5d86ecc0cd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -351,27 +351,42 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         ExpressionMessageFilter finalMessageFilter = messageFilter;
         StringBuilder finalOrderCountInfo = orderCountInfo;
 
+        // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and 
orderCountInfo,
+        // a single POP request could only invoke the popMsgFromQueue method 
once
+        // for either a normal topic or a retry topic's queue. Retry topics v1 
and v2 are
+        // considered the same type because they share the same retry flag in 
previous fields.
+        // Therefore, needRetryV1 is designed as a subset of needRetry, and 
within a single request,
+        // only one type of retry topic is able to call popMsgFromQueue.
         boolean needRetry = randomQ % 5 == 0;
+        boolean needRetryV1 = false;
+        if 
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+            needRetryV1 = randomQ % 2 == 0;
+        }
         long popTime = System.currentTimeMillis();
         CompletableFuture<Long> getMessageFuture = 
CompletableFuture.completedFuture(0L);
         if (needRetry && !requestHeader.isOrder()) {
-            TopicConfig retryTopicConfig =
-                
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-            if (retryTopicConfig != null) {
-                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
-                    int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
-                    getMessageFuture = getMessageFuture.thenCompose(restNum -> 
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, 
requestHeader, queueId, restNum, reviveQid, channel, popTime, 
finalMessageFilter,
-                        startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
-                }
-            }
-            if 
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+            if (needRetryV1) {
                 TopicConfig retryTopicConfigV1 =
                     
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
                 if (retryTopicConfigV1 != null) {
                     for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); 
i++) {
                         int queueId = (randomQ + i) % 
retryTopicConfigV1.getReadQueueNums();
-                        getMessageFuture = 
getMessageFuture.thenCompose(restNum -> 
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, 
requestHeader, queueId, restNum, reviveQid, channel, popTime, 
finalMessageFilter,
-                            startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
+                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
+                            popMsgFromQueue(retryTopicConfigV1.getTopicName(), 
requestHeader.getAttemptId(), true,
+                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
+                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
+                    }
+                }
+            } else {
+                TopicConfig retryTopicConfig =
+                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
+                if (retryTopicConfig != null) {
+                    for (int i = 0; i < retryTopicConfig.getReadQueueNums(); 
i++) {
+                        int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
+                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
+                            popMsgFromQueue(retryTopicConfig.getTopicName(), 
requestHeader.getAttemptId(), true,
+                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
+                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
                     }
                 }
             }
@@ -380,33 +395,42 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             // read all queue
             for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                 int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
-                getMessageFuture = getMessageFuture.thenCompose(restNum -> 
popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, 
requestHeader, queueId, restNum, reviveQid, channel, popTime, 
finalMessageFilter,
-                    startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+                getMessageFuture = getMessageFuture.thenCompose(restNum ->
+                    popMsgFromQueue(topicConfig.getTopicName(), 
requestHeader.getAttemptId(), false,
+                        getMessageResult, requestHeader, queueId, restNum, 
reviveQid, channel, popTime, finalMessageFilter,
+                        startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
             }
         } else {
             int queueId = requestHeader.getQueueId();
-            getMessageFuture = getMessageFuture.thenCompose(restNum -> 
popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, 
requestHeader, queueId, restNum, reviveQid, channel, popTime, 
finalMessageFilter,
-                startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
+            getMessageFuture = getMessageFuture.thenCompose(restNum ->
+                popMsgFromQueue(topicConfig.getTopicName(), 
requestHeader.getAttemptId(), false,
+                    getMessageResult, requestHeader, queueId, restNum, 
reviveQid, channel, popTime, finalMessageFilter,
+                    startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
         }
         // if not full , fetch retry again
         if (!needRetry && getMessageResult.getMessageMapedList().size() < 
requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
-            TopicConfig retryTopicConfig =
-                
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-            if (retryTopicConfig != null) {
-                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
-                    int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
-                    getMessageFuture = getMessageFuture.thenCompose(restNum -> 
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, 
requestHeader, queueId, restNum, reviveQid, channel, popTime, 
finalMessageFilter,
-                        startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
-                }
-            }
-            if 
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+            if (needRetryV1) {
                 TopicConfig retryTopicConfigV1 =
                     
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
                 if (retryTopicConfigV1 != null) {
                     for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); 
i++) {
                         int queueId = (randomQ + i) % 
retryTopicConfigV1.getReadQueueNums();
-                        getMessageFuture = 
getMessageFuture.thenCompose(restNum -> 
popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, 
requestHeader, queueId, restNum, reviveQid, channel, popTime, 
finalMessageFilter,
-                            startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
+                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
+                            popMsgFromQueue(retryTopicConfigV1.getTopicName(), 
requestHeader.getAttemptId(), true,
+                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
+                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
+                    }
+                }
+            } else {
+                TopicConfig retryTopicConfig =
+                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
+                if (retryTopicConfig != null) {
+                    for (int i = 0; i < retryTopicConfig.getReadQueueNums(); 
i++) {
+                        int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
+                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
+                            popMsgFromQueue(retryTopicConfig.getTopicName(), 
requestHeader.getAttemptId(), true,
+                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
+                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
                     }
                 }
             }
@@ -489,12 +513,11 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         return null;
     }
 
-    private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean 
isRetry, GetMessageResult getMessageResult,
+    private CompletableFuture<Long> popMsgFromQueue(String targetTopic, String 
attemptId, boolean isRetry, GetMessageResult getMessageResult,
         PopMessageRequestHeader requestHeader, int queueId, long restNum, int 
reviveQid,
         Channel channel, long popTime, ExpressionMessageFilter messageFilter, 
StringBuilder startOffsetInfo,
         StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
-        String topic = isRetry ? 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
-            requestHeader.getConsumerGroup()) : requestHeader.getTopic();
+        String topic = targetTopic;
         String lockKey =
             topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + 
PopAckConstants.SPLIT + queueId;
         boolean isOrder = requestHeader.isOrder();
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index 44f04066ca..d8c8fa1034 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -76,7 +76,7 @@ public class PopMessageProcessorTest {
         brokerController.getBrokerConfig().setEnablePopBufferMerge(true);
         popMessageProcessor = new PopMessageProcessor(brokerController);
         when(handlerContext.channel()).thenReturn(embeddedChannel);
-        
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
+        
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig(topic));
         clientChannelInfo = new ClientChannelInfo(embeddedChannel);
         ConsumerData consumerData = createConsumerData(group, topic);
         brokerController.getConsumerManager().registerConsumer(

Reply via email to