This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8a36471a19 [ISSUE #7543] Add enableRetryTopicV2 brokerConfig (#7734)
8a36471a19 is described below

commit 8a36471a19aea4a9052a2ad4508c9ed75ad0fe0d
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Thu Jan 11 11:11:53 2024 +0800

    [ISSUE #7543] Add enableRetryTopicV2 brokerConfig (#7734)
    
    * Add enableRetryTopicV2
---
 .../broker/metrics/ConsumerLagCalculator.java      |   4 +-
 .../broker/offset/ConsumerOrderInfoManager.java    |   4 +-
 .../broker/processor/AckMessageProcessor.java      |   2 +-
 .../broker/processor/AdminBrokerProcessor.java     |   6 +-
 .../broker/processor/NotificationProcessor.java    |  68 +++++++------
 .../broker/processor/PeekMessageProcessor.java     |  12 ++-
 .../broker/processor/PopMessageProcessor.java      | 105 +++++++++------------
 .../broker/processor/PopReviveService.java         |   2 +-
 .../broker/processor/AdminBrokerProcessorTest.java |   7 +-
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |   8 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |   9 ++
 .../org/apache/rocketmq/common/KeyBuilder.java     |  11 +++
 .../rocketmq/common/consumer/ReceiptHandle.java    |  11 ++-
 .../org/apache/rocketmq/common/KeyBuilderTest.java |  10 +-
 .../proxy/processor/ConsumerProcessorTest.java     |   5 +-
 .../proxy/processor/ProducerProcessorTest.java     |   3 +-
 .../service/message/LocalMessageServiceTest.java   |   4 +-
 .../remoting/protocol/header/ExtraInfoUtil.java    |  68 ++++++++-----
 .../protocol/header/ExtraInfoUtilTest.java         |   4 +-
 .../test/container/PopSlaveActingMasterIT.java     |  12 ++-
 20 files changed, 204 insertions(+), 151 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index d1f3fffde7..1930d0dfcb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -176,7 +176,7 @@ public class ConsumerLagCalculator {
                 }
 
                 if (isPop) {
-                    String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
group);
+                    String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
group, brokerConfig.isEnableRetryTopicV2());
                     TopicConfig retryTopicConfig = 
topicConfigManager.selectTopicConfig(retryTopic);
                     if (retryTopicConfig != null) {
                         int retryTopicPerm = retryTopicConfig.getPerm() & 
brokerConfig.getBrokerPermission();
@@ -185,7 +185,7 @@ public class ConsumerLagCalculator {
                             continue;
                         }
                     }
-                    if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
+                    if (brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
                         String retryTopicV1 = 
KeyBuilder.buildPopRetryTopicV1(topic, group);
                         TopicConfig retryTopicConfigV1 = 
topicConfigManager.selectTopicConfig(retryTopicV1);
                         if (retryTopicConfigV1 != null) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 2e2850dbbc..4eccc6c037 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -121,7 +121,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
             Set<Long> offsetSet = offsetConsumedCount.keySet();
             for (Long offset : offsetSet) {
                 Integer consumedTimes = 
offsetConsumedCount.getOrDefault(offset, 0);
-                ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, 
isRetry, queueId, offset, consumedTimes);
+                ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, 
topic, queueId, offset, consumedTimes);
                 minConsumedTimes = Math.min(minConsumedTimes, consumedTimes);
             }
 
@@ -136,7 +136,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
 
         // for compatibility
         // the old pop sdk use queueId to get consumedTimes from orderCountInfo
-        ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, isRetry, 
queueId, minConsumedTimes);
+        ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, topic, 
queueId, minConsumedTimes);
         updateLockFreeTimestamp(topic, group, queueId, orderInfo);
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 59a3e63b2a..9a56498632 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -196,7 +196,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
         } else {
             // batch ack
             consumeGroup = batchAck.getConsumerGroup();
-            topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), 
batchAck.getConsumerGroup(), 
ExtraInfoUtil.RETRY_TOPIC.equals(batchAck.getRetry()));
+            topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), 
batchAck.getConsumerGroup(), batchAck.getRetry());
             qId = batchAck.getQueueId();
             rqId = batchAck.getReviveQueueId();
             startOffset = batchAck.getStartOffset();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 978c2e81d0..4cc6d53b15 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -555,9 +555,9 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         // delete pop retry topics first
         try {
             for (String group : groups) {
-                final String popRetryTopic = 
KeyBuilder.buildPopRetryTopic(topic, group);
-                if 
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != 
null) {
-                    deleteTopicInBroker(popRetryTopic);
+                final String popRetryTopicV2 = 
KeyBuilder.buildPopRetryTopic(topic, group, true);
+                if 
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != 
null) {
+                    deleteTopicInBroker(popRetryTopicV2);
                 }
                 final String popRetryTopicV1 = 
KeyBuilder.buildPopRetryTopicV1(topic, group);
                 if 
(brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != 
null) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 91d275dfe0..0deb3ee707 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.longpolling.PollingHeader;
 import org.apache.rocketmq.broker.longpolling.PollingResult;
 import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -121,44 +122,30 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         int randomQ = random.nextInt(100);
         boolean hasMsg = false;
         boolean needRetry = randomQ % 5 == 0;
+        BrokerConfig brokerConfig = brokerController.getBrokerConfig();
         if (needRetry) {
-            TopicConfig retryTopicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-            if (retryTopicConfig != null) {
-                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
-                    int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
-                    hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
-                    if (hasMsg) {
-                        break;
-                    }
-                }
+            String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
+            hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
+            if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
+                String retryTopicConfigV1 = 
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, 
requestHeader);
             }
         }
         if (!hasMsg) {
             if (requestHeader.getQueueId() < 0) {
                 // read all queue
-                for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
-                    int queueId = (randomQ + i) % 
topicConfig.getReadQueueNums();
-                    hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
-                    if (hasMsg) {
-                        break;
-                    }
-                }
+                hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
             } else {
                 int queueId = requestHeader.getQueueId();
-                hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
+                hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId);
             }
             // if it doesn't have message, fetch retry again
             if (!needRetry && !hasMsg) {
-                TopicConfig retryTopicConfig =
-                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-                if (retryTopicConfig != null) {
-                    for (int i = 0; i < retryTopicConfig.getReadQueueNums(); 
i++) {
-                        int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
-                        hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
-                        if (hasMsg) {
-                            break;
-                        }
-                    }
+                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
+                hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
+                if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
+                    String retryTopicConfigV1 = 
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                    hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, 
requestHeader);
                 }
             }
         }
@@ -173,15 +160,34 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader 
requestHeader, int queueId) {
+    private boolean hasMsgFromTopic(String topicName, int randomQ, 
NotificationRequestHeader requestHeader) {
+        boolean hasMsg;
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
+        return hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+    }
+
+    private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, 
NotificationRequestHeader requestHeader) {
+        boolean hasMsg;
+        if (topicConfig != null) {
+            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
+                hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId);
+                if (hasMsg) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean hasMsgFromQueue(String targetTopic, 
NotificationRequestHeader requestHeader, int queueId) {
         if (Boolean.TRUE.equals(requestHeader.getOrder())) {
             if 
(this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(),
 requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
                 return false;
             }
         }
-        String topic = isRetry ? 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
-        long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId);
-        long restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset;
+        long offset = getPopOffset(targetTopic, 
requestHeader.getConsumerGroup(), queueId);
+        long restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, 
queueId) - offset;
         return restNum > 0;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index e1e0e13e53..a72759883c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -133,8 +134,10 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
         boolean needRetry = randomQ % 5 == 0;
         long popTime = System.currentTimeMillis();
         long restNum = 0;
+        BrokerConfig brokerConfig = brokerController.getBrokerConfig();
         if (needRetry) {
-            TopicConfig retryTopicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
+            TopicConfig retryTopicConfig = 
this.brokerController.getTopicConfigManager()
+                
.selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()));
             if (retryTopicConfig != null) {
                 for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                     int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
@@ -154,7 +157,8 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
         }
         // if not full , fetch retry again
         if (!needRetry && getMessageResult.getMessageMapedList().size() < 
requestHeader.getMaxMsgNums()) {
-            TopicConfig retryTopicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
+            TopicConfig retryTopicConfig = 
this.brokerController.getTopicConfigManager()
+                
.selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()));
             if (retryTopicConfig != null) {
                 for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                     int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
@@ -226,7 +230,9 @@ public class PeekMessageProcessor implements 
NettyRequestProcessor {
     private long peekMsgFromQueue(boolean isRetry, GetMessageResult 
getMessageResult,
         PeekMessageRequestHeader requestHeader, int queueId, long restNum, int 
reviveQid, Channel channel,
         long popTime) {
-        String topic = isRetry ? 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
+        String topic = isRetry ?
+            KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), 
brokerController.getBrokerConfig().isEnableRetryTopicV2())
+            : requestHeader.getTopic();
         GetMessageResult getMessageTmpResult;
         long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId);
         restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 5d86ecc0cd..02e266a786 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -45,6 +45,7 @@ import 
org.apache.rocketmq.broker.longpolling.PopLongPollingService;
 import org.apache.rocketmq.broker.longpolling.PopRequest;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PopAckConstants;
@@ -289,6 +290,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             return response;
         }
 
+        BrokerConfig brokerConfig = brokerController.getBrokerConfig();
         ExpressionMessageFilter messageFilter = null;
         if (requestHeader.getExp() != null && requestHeader.getExp().length() 
> 0) {
             try {
@@ -296,7 +298,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                 
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                     requestHeader.getTopic(), subscriptionData);
 
-                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                 SubscriptionData retrySubscriptionData = 
FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, 
requestHeader.getExpType());
                 
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                     retryTopic, retrySubscriptionData);
@@ -330,7 +332,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                 
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                     requestHeader.getTopic(), subscriptionData);
 
-                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                 SubscriptionData retrySubscriptionData = 
FilterAPI.build(retryTopic, "*", ExpressionType.TAG);
                 
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                     retryTopic, retrySubscriptionData);
@@ -359,47 +361,26 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         // only one type of retry topic is able to call popMsgFromQueue.
         boolean needRetry = randomQ % 5 == 0;
         boolean needRetryV1 = false;
-        if 
(brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) {
+        if (brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
             needRetryV1 = randomQ % 2 == 0;
         }
         long popTime = System.currentTimeMillis();
         CompletableFuture<Long> getMessageFuture = 
CompletableFuture.completedFuture(0L);
         if (needRetry && !requestHeader.isOrder()) {
             if (needRetryV1) {
-                TopicConfig retryTopicConfigV1 =
-                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-                if (retryTopicConfigV1 != null) {
-                    for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); 
i++) {
-                        int queueId = (randomQ + i) % 
retryTopicConfigV1.getReadQueueNums();
-                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
-                            popMsgFromQueue(retryTopicConfigV1.getTopicName(), 
requestHeader.getAttemptId(), true,
-                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
-                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
-                    }
-                }
+                String retryTopic = 
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                getMessageFuture = popMsgFromTopic(retryTopic, true, 
getMessageResult, requestHeader, reviveQid, channel,
+                    popTime, finalMessageFilter, startOffsetInfo, 
msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
             } else {
-                TopicConfig retryTopicConfig =
-                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-                if (retryTopicConfig != null) {
-                    for (int i = 0; i < retryTopicConfig.getReadQueueNums(); 
i++) {
-                        int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
-                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
-                            popMsgFromQueue(retryTopicConfig.getTopicName(), 
requestHeader.getAttemptId(), true,
-                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
-                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
-                    }
-                }
+                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
+                getMessageFuture = popMsgFromTopic(retryTopic, true, 
getMessageResult, requestHeader, reviveQid, channel,
+                    popTime, finalMessageFilter, startOffsetInfo, 
msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
             }
         }
         if (requestHeader.getQueueId() < 0) {
             // read all queue
-            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
-                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
-                getMessageFuture = getMessageFuture.thenCompose(restNum ->
-                    popMsgFromQueue(topicConfig.getTopicName(), 
requestHeader.getAttemptId(), false,
-                        getMessageResult, requestHeader, queueId, restNum, 
reviveQid, channel, popTime, finalMessageFilter,
-                        startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
-            }
+            getMessageFuture = popMsgFromTopic(topicConfig, false, 
getMessageResult, requestHeader, reviveQid, channel,
+                popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, 
orderCountInfo, randomQ, getMessageFuture);
         } else {
             int queueId = requestHeader.getQueueId();
             getMessageFuture = getMessageFuture.thenCompose(restNum ->
@@ -410,29 +391,13 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         // if not full , fetch retry again
         if (!needRetry && getMessageResult.getMessageMapedList().size() < 
requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
             if (needRetryV1) {
-                TopicConfig retryTopicConfigV1 =
-                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-                if (retryTopicConfigV1 != null) {
-                    for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); 
i++) {
-                        int queueId = (randomQ + i) % 
retryTopicConfigV1.getReadQueueNums();
-                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
-                            popMsgFromQueue(retryTopicConfigV1.getTopicName(), 
requestHeader.getAttemptId(), true,
-                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
-                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
-                    }
-                }
+                String retryTopicV1 = 
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                getMessageFuture = popMsgFromTopic(retryTopicV1, true, 
getMessageResult, requestHeader, reviveQid, channel,
+                    popTime, finalMessageFilter, startOffsetInfo, 
msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
             } else {
-                TopicConfig retryTopicConfig =
-                    
this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
 requestHeader.getConsumerGroup()));
-                if (retryTopicConfig != null) {
-                    for (int i = 0; i < retryTopicConfig.getReadQueueNums(); 
i++) {
-                        int queueId = (randomQ + i) % 
retryTopicConfig.getReadQueueNums();
-                        getMessageFuture = 
getMessageFuture.thenCompose(restNum ->
-                            popMsgFromQueue(retryTopicConfig.getTopicName(), 
requestHeader.getAttemptId(), true,
-                                getMessageResult, requestHeader, queueId, 
restNum, reviveQid, channel, popTime, finalMessageFilter,
-                                startOffsetInfo, msgOffsetInfo, 
finalOrderCountInfo));
-                    }
-                }
+                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
+                getMessageFuture = popMsgFromTopic(retryTopic, true, 
getMessageResult, requestHeader, reviveQid, channel,
+                    popTime, finalMessageFilter, startOffsetInfo, 
msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
             }
         }
 
@@ -513,11 +478,35 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         return null;
     }
 
-    private CompletableFuture<Long> popMsgFromQueue(String targetTopic, String 
attemptId, boolean isRetry, GetMessageResult getMessageResult,
+    private CompletableFuture<Long> popMsgFromTopic(TopicConfig topicConfig, 
boolean isRetry, GetMessageResult getMessageResult,
+        PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, 
long popTime,
+        ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
+        StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int 
randomQ, CompletableFuture<Long> getMessageFuture) {
+        if (topicConfig != null) {
+            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
+                getMessageFuture = getMessageFuture.thenCompose(restNum ->
+                    popMsgFromQueue(topicConfig.getTopicName(), 
requestHeader.getAttemptId(), isRetry,
+                        getMessageResult, requestHeader, queueId, restNum, 
reviveQid, channel, popTime, messageFilter,
+                        startOffsetInfo, msgOffsetInfo, orderCountInfo));
+            }
+        }
+        return getMessageFuture;
+    }
+
+    private CompletableFuture<Long> popMsgFromTopic(String topic, boolean 
isRetry, GetMessageResult getMessageResult,
+        PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, 
long popTime,
+        ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
+        StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int 
randomQ, CompletableFuture<Long> getMessageFuture) {
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+        return popMsgFromTopic(topicConfig, isRetry, getMessageResult, 
requestHeader, reviveQid, channel, popTime,
+            messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, 
randomQ, getMessageFuture);
+    }
+
+    private CompletableFuture<Long> popMsgFromQueue(String topic, String 
attemptId, boolean isRetry, GetMessageResult getMessageResult,
         PopMessageRequestHeader requestHeader, int queueId, long restNum, int 
reviveQid,
         Channel channel, long popTime, ExpressionMessageFilter messageFilter, 
StringBuilder startOffsetInfo,
         StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
-        String topic = targetTopic;
         String lockKey =
             topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + 
PopAckConstants.SPLIT + queueId;
         boolean isOrder = requestHeader.isOrder();
@@ -618,8 +607,8 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                             return atomicRestNum.get() + 
result.getMessageCount();
                         }
                     }
-                    ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, 
isRetry, queueId, finalOffset);
-                    ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, 
queueId,
+                    ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 
queueId, finalOffset);
+                    ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 
queueId,
                         result.getMessageQueueOffset());
                 } else if 
((GetMessageStatus.NO_MATCHED_MESSAGE.equals(result.getStatus())
                     || 
GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 8d25bc57e1..104b78d441 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -103,7 +103,7 @@ public class PopReviveService extends ServiceThread {
     private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt 
messageExt) {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         if 
(!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-            
msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), 
popCheckPoint.getCId()));
+            
msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), 
popCheckPoint.getCId(), 
brokerController.getBrokerConfig().isEnableRetryTopicV2()));
         } else {
             msgInner.setTopic(popCheckPoint.getTopic());
         }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index c6b889baee..e4fcc690d1 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -278,15 +278,16 @@ public class AdminBrokerProcessorTest {
     public void testDeleteWithPopRetryTopic() throws Exception {
         String topic = "topicA";
         String anotherTopic = "another_topicA";
+        BrokerConfig brokerConfig = new BrokerConfig();
 
         topicConfigManager = mock(TopicConfigManager.class);
         
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
         topicConfigTable.put(topic, new TopicConfig());
-        topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new 
TopicConfig());
+        topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1", 
brokerConfig.isEnableRetryTopicV2()), new TopicConfig());
 
         topicConfigTable.put(anotherTopic, new TopicConfig());
-        topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, 
"cid2"), new TopicConfig());
+        topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, 
"cid2", brokerConfig.isEnableRetryTopicV2()), new TopicConfig());
         
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
         
when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation 
-> {
             final String selectTopic = invocation.getArgument(0);
@@ -301,7 +302,7 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
 
         verify(topicConfigManager).deleteTopicConfig(topic);
-        
verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic,
 "cid1"));
+        
verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic,
 "cid1", brokerConfig.isEnableRetryTopicV2()));
         verify(messageStore, times(2)).deleteTopics(anySet());
     }
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index cf399802ba..9f1a71c92f 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -446,10 +446,10 @@ public class MQClientAPIImplTest {
                 responseHeader.setReviveQid(0);
                 responseHeader.setRestNum(1);
                 StringBuilder startOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 
0L);
+                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 
0L);
                 responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
                 StringBuilder msgOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, 
Collections.singletonList(0L));
+                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
                 responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
                 response.setRemark("FOUND");
                 response.makeCustomHeaderToNet();
@@ -515,10 +515,10 @@ public class MQClientAPIImplTest {
                 responseHeader.setReviveQid(0);
                 responseHeader.setRestNum(1);
                 StringBuilder startOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 
0L);
+                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 
0L);
                 responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
                 StringBuilder msgOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, 
Collections.singletonList(0L));
+                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
                 responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
                 response.setRemark("FOUND");
                 response.makeCustomHeaderToNet();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a4a553ad5b..bedc7f386b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -225,6 +225,7 @@ public class BrokerConfig extends BrokerIdentity {
     private boolean initPopOffsetByCheckMsgInMem = true;
     // read message from pop retry topic v1, for the compatibility, will be 
removed in the future version
     private boolean retrieveMessageFromPopRetryTopicV1 = true;
+    private boolean enableRetryTopicV2 = false;
 
     private boolean realTimeNotifyConsumerChange = true;
 
@@ -1309,6 +1310,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.retrieveMessageFromPopRetryTopicV1 = 
retrieveMessageFromPopRetryTopicV1;
     }
 
+    public boolean isEnableRetryTopicV2() {
+        return enableRetryTopicV2;
+    }
+
+    public void setEnableRetryTopicV2(boolean enableRetryTopicV2) {
+        this.enableRetryTopicV2 = enableRetryTopicV2;
+    }
+
     public boolean isRealTimeNotifyConsumerChange() {
         return realTimeNotifyConsumerChange;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java 
b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
index 0f77c96ab0..910a73b713 100644
--- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
@@ -22,7 +22,18 @@ public class KeyBuilder {
     private static final char POP_RETRY_SEPARATOR_V2 = '+';
     private static final String POP_RETRY_REGEX_SEPARATOR_V2 = "\\+";
 
+    public static String buildPopRetryTopic(String topic, String cid, boolean 
enableRetryV2) {
+        if (enableRetryV2) {
+            return buildPopRetryTopicV2(topic, cid);
+        }
+        return buildPopRetryTopicV1(topic, cid);
+    }
+
     public static String buildPopRetryTopic(String topic, String cid) {
+        return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 
+ topic;
+    }
+
+    public static String buildPopRetryTopicV2(String topic, String cid) {
         return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 
+ topic;
     }
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java 
b/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java
index 392a3ae339..daaaee5600 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java
@@ -26,6 +26,8 @@ public class ReceiptHandle {
     private static final String SEPARATOR = MessageConst.KEY_SEPARATOR;
     public static final String NORMAL_TOPIC = "0";
     public static final String RETRY_TOPIC = "1";
+
+    public static final String RETRY_TOPIC_V2 = "2";
     private final long startOffset;
     private final long retrieveTime;
     private final long invisibleTime;
@@ -220,12 +222,15 @@ public class ReceiptHandle {
     }
 
     public boolean isRetryTopic() {
-        return RETRY_TOPIC.equals(topicType);
+        return RETRY_TOPIC.equals(topicType) || 
RETRY_TOPIC_V2.equals(topicType);
     }
 
     public String getRealTopic(String topic, String groupName) {
-        if (isRetryTopic()) {
-            return KeyBuilder.buildPopRetryTopic(topic, groupName);
+        if (RETRY_TOPIC.equals(topicType)) {
+            return KeyBuilder.buildPopRetryTopicV1(topic, groupName);
+        }
+        if (RETRY_TOPIC_V2.equals(topicType)) {
+            return KeyBuilder.buildPopRetryTopicV2(topic, groupName);
         }
         return topic;
     }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java 
b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
index 3c75871eaf..47191c907f 100644
--- a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java
@@ -27,7 +27,7 @@ public class KeyBuilderTest {
 
     @Test
     public void testBuildPopRetryTopic() {
-        assertThat(KeyBuilder.buildPopRetryTopic(topic, 
group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "+" + topic);
+        assertThat(KeyBuilder.buildPopRetryTopicV2(topic, 
group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "+" + topic);
     }
 
     @Test
@@ -37,25 +37,25 @@ public class KeyBuilderTest {
 
     @Test
     public void testParseNormalTopic() {
-        String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+        String popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group);
         assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, 
group)).isEqualTo(topic);
 
         String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
         assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, 
group)).isEqualTo(topic);
 
-        popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+        popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group);
         
assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic);
     }
 
     @Test
     public void testParseGroup() {
-        String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+        String popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group);
         assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group);
     }
 
     @Test
     public void testIsPopRetryTopicV2() {
-        String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
+        String popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group);
         
assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true);
         String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
         
assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index db268a06e6..f154033e45 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.client.consumer.PopStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
@@ -169,7 +170,7 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
             CONSUMER_GROUP, TOPIC, 3000).get();
 
         assertEquals(AckStatus.OK, ackResult.getStatus());
-        assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP), 
requestHeaderArgumentCaptor.getValue().getTopic());
+        assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new 
BrokerConfig().isEnableRetryTopicV2()), 
requestHeaderArgumentCaptor.getValue().getTopic());
         assertEquals(CONSUMER_GROUP, 
requestHeaderArgumentCaptor.getValue().getConsumerGroup());
         assertEquals(handle.getReceiptHandle(), 
requestHeaderArgumentCaptor.getValue().getExtraInfo());
     }
@@ -292,7 +293,7 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
             CONSUMER_GROUP, TOPIC, 1000, 3000).get();
 
         assertEquals(AckStatus.OK, ackResult.getStatus());
-        assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP), 
requestHeaderArgumentCaptor.getValue().getTopic());
+        assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new 
BrokerConfig().isEnableRetryTopicV2()), 
requestHeaderArgumentCaptor.getValue().getTopic());
         assertEquals(CONSUMER_GROUP, 
requestHeaderArgumentCaptor.getValue().getConsumerGroup());
         assertEquals(1000, 
requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue());
         assertEquals(handle.getReceiptHandle(), 
requestHeaderArgumentCaptor.getValue().getExtraInfo());
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
index de63b7e75f..af61c80315 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
@@ -185,7 +186,7 @@ public class ProducerProcessorTest extends 
BaseProcessorTest {
         when(this.messageService.sendMessageBack(any(), any(), anyString(), 
requestHeaderArgumentCaptor.capture(), anyLong()))
             
.thenReturn(CompletableFuture.completedFuture(mock(RemotingCommand.class)));
 
-        MessageExt messageExt = 
createMessageExt(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP), "", 16, 
3000);
+        MessageExt messageExt = 
createMessageExt(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new 
BrokerConfig().isEnableRetryTopicV2()), "", 16, 3000);
         RemotingCommand remotingCommand = 
this.producerProcessor.forwardMessageToDeadLetterQueue(
             createContext(),
             create(messageExt),
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index 51fea167d0..e959244dec 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -286,8 +286,8 @@ public class LocalMessageServiceTest extends InitConfigTest 
{
         MessageExt message2 = buildMessageExt(topic, 0, startOffset + 1);
         messageExtList.add(message2);
         messageOffsetList.add(startOffset + 1);
-        ExtraInfoUtil.buildStartOffsetInfo(startOffsetStringBuilder, false, 
queueId, startOffset);
-        ExtraInfoUtil.buildMsgOffsetInfo(messageOffsetStringBuilder, false, 
queueId, messageOffsetList);
+        ExtraInfoUtil.buildStartOffsetInfo(startOffsetStringBuilder, topic, 
queueId, startOffset);
+        ExtraInfoUtil.buildMsgOffsetInfo(messageOffsetStringBuilder, topic, 
queueId, messageOffsetList);
         byte[] body2 = MessageDecoder.encode(message2, false);
         ByteBuffer byteBuffer1 = ByteBuffer.wrap(body1);
         ByteBuffer byteBuffer2 = ByteBuffer.wrap(body2);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 13094331e6..a6a4a77767 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -26,7 +26,8 @@ import org.apache.rocketmq.common.message.MessageConst;
 
 public class ExtraInfoUtil {
     private static final String NORMAL_TOPIC = "0";
-    public static final String RETRY_TOPIC = "1";
+    private static final String RETRY_TOPIC = "1";
+    private static final String RETRY_TOPIC_V2 = "2";
     private static final String QUEUE_OFFSET = "qo";
 
     public static String[] split(String extraInfo) {
@@ -69,14 +70,24 @@ public class ExtraInfoUtil {
             throw new IllegalArgumentException("getRealTopic fail, 
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
         }
         if (RETRY_TOPIC.equals(extraInfoStrs[4])) {
-            return KeyBuilder.buildPopRetryTopic(topic, cid);
+            return KeyBuilder.buildPopRetryTopicV1(topic, cid);
+        } else if (RETRY_TOPIC_V2.equals(extraInfoStrs[4])) {
+            return KeyBuilder.buildPopRetryTopicV2(topic, cid);
         } else {
             return topic;
         }
     }
 
-    public static String getRealTopic(String topic, String cid, boolean 
isRetry) {
-        return isRetry ? KeyBuilder.buildPopRetryTopic(topic, cid) : topic;
+    public static String getRealTopic(String topic, String cid, String retry) {
+        if (retry.equals(NORMAL_TOPIC)) {
+            return topic;
+        } else if (retry.equals(RETRY_TOPIC)) {
+            return KeyBuilder.buildPopRetryTopicV1(topic, cid);
+        } else if (retry.equals(RETRY_TOPIC_V2)) {
+            return KeyBuilder.buildPopRetryTopicV2(topic, cid);
+        } else {
+            throw new IllegalArgumentException("getRetry fail, format is 
wrong");
+        }
     }
 
     public static String getRetry(String[] extraInfoStrs) {
@@ -108,20 +119,14 @@ public class ExtraInfoUtil {
     }
 
     public static String buildExtraInfo(long ckQueueOffset, long popTime, long 
invisibleTime, int reviveQid, String topic, String brokerName, int queueId) {
-        String t = NORMAL_TOPIC;
-        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-            t = RETRY_TOPIC;
-        }
+        String t = getRetry(topic);
         return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime + 
MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR + 
reviveQid + MessageConst.KEY_SEPARATOR + t
             + MessageConst.KEY_SEPARATOR + brokerName + 
MessageConst.KEY_SEPARATOR + queueId;
     }
 
     public static String buildExtraInfo(long ckQueueOffset, long popTime, long 
invisibleTime, int reviveQid, String topic, String brokerName, int queueId,
                                         long msgQueueOffset) {
-        String t = NORMAL_TOPIC;
-        if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-            t = RETRY_TOPIC;
-        }
+        String t = getRetry(topic);
         return ckQueueOffset
             + MessageConst.KEY_SEPARATOR + popTime + 
MessageConst.KEY_SEPARATOR + invisibleTime
             + MessageConst.KEY_SEPARATOR + reviveQid + 
MessageConst.KEY_SEPARATOR + t
@@ -129,7 +134,7 @@ public class ExtraInfoUtil {
             + MessageConst.KEY_SEPARATOR + msgQueueOffset;
     }
 
-    public static void buildStartOffsetInfo(StringBuilder stringBuilder, 
boolean retry, int queueId, long startOffset) {
+    public static void buildStartOffsetInfo(StringBuilder stringBuilder, 
String topic, int queueId, long startOffset) {
         if (stringBuilder == null) {
             stringBuilder = new StringBuilder(64);
         }
@@ -138,12 +143,12 @@ public class ExtraInfoUtil {
             stringBuilder.append(";");
         }
 
-        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+        stringBuilder.append(getRetry(topic))
             .append(MessageConst.KEY_SEPARATOR).append(queueId)
             .append(MessageConst.KEY_SEPARATOR).append(startOffset);
     }
 
-    public static void buildQueueIdOrderCountInfo(StringBuilder stringBuilder, 
boolean retry, int queueId, int orderCount) {
+    public static void buildQueueIdOrderCountInfo(StringBuilder stringBuilder, 
String topic, int queueId, int orderCount) {
         if (stringBuilder == null) {
             stringBuilder = new StringBuilder(64);
         }
@@ -152,12 +157,12 @@ public class ExtraInfoUtil {
             stringBuilder.append(";");
         }
 
-        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+        stringBuilder.append(getRetry(topic))
                 .append(MessageConst.KEY_SEPARATOR).append(queueId)
                 .append(MessageConst.KEY_SEPARATOR).append(orderCount);
     }
 
-    public static void buildQueueOffsetOrderCountInfo(StringBuilder 
stringBuilder, boolean retry, long queueId, long queueOffset, int orderCount) {
+    public static void buildQueueOffsetOrderCountInfo(StringBuilder 
stringBuilder, String topic, long queueId, long queueOffset, int orderCount) {
         if (stringBuilder == null) {
             stringBuilder = new StringBuilder(64);
         }
@@ -166,12 +171,12 @@ public class ExtraInfoUtil {
             stringBuilder.append(";");
         }
 
-        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+        stringBuilder.append(getRetry(topic))
             
.append(MessageConst.KEY_SEPARATOR).append(getQueueOffsetKeyValueKey(queueId, 
queueOffset))
             .append(MessageConst.KEY_SEPARATOR).append(orderCount);
     }
 
-    public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean 
retry, int queueId, List<Long> msgOffsets) {
+    public static void buildMsgOffsetInfo(StringBuilder stringBuilder, String 
topic, int queueId, List<Long> msgOffsets) {
         if (stringBuilder == null) {
             stringBuilder = new StringBuilder(64);
         }
@@ -180,7 +185,7 @@ public class ExtraInfoUtil {
             stringBuilder.append(";");
         }
 
-        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+        stringBuilder.append(getRetry(topic))
             .append(MessageConst.KEY_SEPARATOR).append(queueId)
             .append(MessageConst.KEY_SEPARATOR);
 
@@ -279,11 +284,11 @@ public class ExtraInfoUtil {
     }
 
     public static String getStartOffsetInfoMapKey(String topic, long key) {
-        return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? 
RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+        return getRetry(topic) + "@" + key;
     }
 
     public static String getStartOffsetInfoMapKey(String topic, String popCk, 
long key) {
-        return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != 
null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+        return getRetry(topic, popCk) + "@" + key;
     }
 
     public static String getQueueOffsetKeyValueKey(long queueId, long 
queueOffset) {
@@ -291,10 +296,27 @@ public class ExtraInfoUtil {
     }
 
     public static String getQueueOffsetMapKey(String topic, long queueId, long 
queueOffset) {
-        return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? 
RETRY_TOPIC : NORMAL_TOPIC) + "@" + getQueueOffsetKeyValueKey(queueId, 
queueOffset);
+        return getRetry(topic) + "@" + getQueueOffsetKeyValueKey(queueId, 
queueOffset);
     }
 
     public static boolean isOrder(String[] extraInfo) {
         return ExtraInfoUtil.getReviveQid(extraInfo) == 
KeyBuilder.POP_ORDER_REVIVE_QUEUE;
     }
+
+    private static String getRetry(String topic) {
+        String t = NORMAL_TOPIC;
+        if (KeyBuilder.isPopRetryTopicV2(topic)) {
+            t = RETRY_TOPIC_V2;
+        } else if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            t = RETRY_TOPIC;
+        }
+        return t;
+    }
+
+    private static String getRetry(String topic, String popCk) {
+        if (popCk != null) {
+            return getRetry(split(popCk));
+        }
+        return getRetry(topic);
+    }
 }
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java
index 7cf258711c..8081f386cb 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java
@@ -36,8 +36,8 @@ public class ExtraInfoUtilTest {
         String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, 
queueId, queueOffset);
 
         StringBuilder sb = new StringBuilder();
-        ExtraInfoUtil.buildQueueIdOrderCountInfo(sb, false, queueId, 
queueIdCount);
-        ExtraInfoUtil.buildQueueOffsetOrderCountInfo(sb, false, queueId, 
queueOffset, queueOffsetCount);
+        ExtraInfoUtil.buildQueueIdOrderCountInfo(sb, topic, queueId, 
queueIdCount);
+        ExtraInfoUtil.buildQueueOffsetOrderCountInfo(sb, topic, queueId, 
queueOffset, queueOffsetCount);
         Map<String, Integer> orderCountInfo = 
ExtraInfoUtil.parseOrderCountInfo(sb.toString());
 
         assertEquals(queueIdCount, orderCountInfo.get(queueIdKey));
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
index f20a748a6f..fe40e866a6 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
@@ -25,6 +25,7 @@ import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -60,6 +61,7 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
     private static DefaultMQProducer producer;
     private final static String MESSAGE_STRING = 
RandomStringUtils.random(1024);
     private static final byte[] MESSAGE_BODY = 
MESSAGE_STRING.getBytes(StandardCharsets.UTF_8);
+    private final BrokerConfig brokerConfig = new BrokerConfig();
 
     public PopSlaveActingMasterIT() {
     }
@@ -87,7 +89,7 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
     public void testLocalActing_ackSlave() throws Exception {
         String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
         createTopic(topic);
-        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2());
         createTopic(retryTopic);
 
         this.switchPop(topic);
@@ -151,7 +153,7 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
     public void testLocalActing_notAckSlave() throws Exception {
         String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
         createTopic(topic);
-        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2());
         createTopic(retryTopic);
 
         this.switchPop(topic);
@@ -231,7 +233,7 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
     public void testRemoteActing_ackSlave() throws Exception {
         String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
         createTopic(topic);
-        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2());
         createTopic(retryTopic);
 
         switchPop(topic);
@@ -312,7 +314,7 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
         createTopic(topic);
         this.switchPop(topic);
 
-        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2());
         createTopic(retryTopic);
 
         
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
@@ -399,7 +401,7 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
         String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
         createTopic(topic);
         this.switchPop(topic);
-        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
+        String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2());
         createTopic(retryTopic);
 
         
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);

Reply via email to