This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 8a36471a19 [ISSUE #7543] Add enableRetryTopicV2 brokerConfig (#7734) 8a36471a19 is described below commit 8a36471a19aea4a9052a2ad4508c9ed75ad0fe0d Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Thu Jan 11 11:11:53 2024 +0800 [ISSUE #7543] Add enableRetryTopicV2 brokerConfig (#7734) * Add enableRetryTopicV2 --- .../broker/metrics/ConsumerLagCalculator.java | 4 +- .../broker/offset/ConsumerOrderInfoManager.java | 4 +- .../broker/processor/AckMessageProcessor.java | 2 +- .../broker/processor/AdminBrokerProcessor.java | 6 +- .../broker/processor/NotificationProcessor.java | 68 +++++++------ .../broker/processor/PeekMessageProcessor.java | 12 ++- .../broker/processor/PopMessageProcessor.java | 105 +++++++++------------ .../broker/processor/PopReviveService.java | 2 +- .../broker/processor/AdminBrokerProcessorTest.java | 7 +- .../rocketmq/client/impl/MQClientAPIImplTest.java | 8 +- .../org/apache/rocketmq/common/BrokerConfig.java | 9 ++ .../org/apache/rocketmq/common/KeyBuilder.java | 11 +++ .../rocketmq/common/consumer/ReceiptHandle.java | 11 ++- .../org/apache/rocketmq/common/KeyBuilderTest.java | 10 +- .../proxy/processor/ConsumerProcessorTest.java | 5 +- .../proxy/processor/ProducerProcessorTest.java | 3 +- .../service/message/LocalMessageServiceTest.java | 4 +- .../remoting/protocol/header/ExtraInfoUtil.java | 68 ++++++++----- .../protocol/header/ExtraInfoUtilTest.java | 4 +- .../test/container/PopSlaveActingMasterIT.java | 12 ++- 20 files changed, 204 insertions(+), 151 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java index d1f3fffde7..1930d0dfcb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java @@ -176,7 +176,7 @@ public class ConsumerLagCalculator { } if (isPop) { - String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group, brokerConfig.isEnableRetryTopicV2()); TopicConfig retryTopicConfig = topicConfigManager.selectTopicConfig(retryTopic); if (retryTopicConfig != null) { int retryTopicPerm = retryTopicConfig.getPerm() & brokerConfig.getBrokerPermission(); @@ -185,7 +185,7 @@ public class ConsumerLagCalculator { continue; } } - if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { + if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1); if (retryTopicConfigV1 != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 2e2850dbbc..4eccc6c037 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -121,7 +121,7 @@ public class ConsumerOrderInfoManager extends ConfigManager { Set<Long> offsetSet = offsetConsumedCount.keySet(); for (Long offset : offsetSet) { Integer consumedTimes = offsetConsumedCount.getOrDefault(offset, 0); - ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, isRetry, queueId, offset, consumedTimes); + ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, topic, queueId, offset, consumedTimes); minConsumedTimes = Math.min(minConsumedTimes, consumedTimes); } @@ -136,7 +136,7 @@ public class ConsumerOrderInfoManager extends ConfigManager { // for compatibility // the old pop sdk use queueId to get consumedTimes from orderCountInfo - ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, isRetry, queueId, minConsumedTimes); + ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, topic, queueId, minConsumedTimes); updateLockFreeTimestamp(topic, group, queueId, orderInfo); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 59a3e63b2a..9a56498632 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -196,7 +196,7 @@ public class AckMessageProcessor implements NettyRequestProcessor { } else { // batch ack consumeGroup = batchAck.getConsumerGroup(); - topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), ExtraInfoUtil.RETRY_TOPIC.equals(batchAck.getRetry())); + topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry()); qId = batchAck.getQueueId(); rqId = batchAck.getReviveQueueId(); startOffset = batchAck.getStartOffset(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 978c2e81d0..4cc6d53b15 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -555,9 +555,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { // delete pop retry topics first try { for (String group : groups) { - final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); - if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) { - deleteTopicInBroker(popRetryTopic); + final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true); + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) { + deleteTopicInBroker(popRetryTopicV2); } final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 91d275dfe0..0deb3ee707 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.longpolling.PollingHeader; import org.apache.rocketmq.broker.longpolling.PollingResult; import org.apache.rocketmq.broker.longpolling.PopLongPollingService; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -121,44 +122,30 @@ public class NotificationProcessor implements NettyRequestProcessor { int randomQ = random.nextInt(100); boolean hasMsg = false; boolean needRetry = randomQ % 5 == 0; + BrokerConfig brokerConfig = brokerController.getBrokerConfig(); if (needRetry) { - TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(true, requestHeader, queueId); - if (hasMsg) { - break; - } - } + String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); + hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader); + if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { + String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader); } } if (!hasMsg) { if (requestHeader.getQueueId() < 0) { // read all queue - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(false, requestHeader, queueId); - if (hasMsg) { - break; - } - } + hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader); } else { int queueId = requestHeader.getQueueId(); - hasMsg = hasMsgFromQueue(false, requestHeader, queueId); + hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId); } // if it doesn't have message, fetch retry again if (!needRetry && !hasMsg) { - TopicConfig retryTopicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(true, requestHeader, queueId); - if (hasMsg) { - break; - } - } + String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); + hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader); + if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { + String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader); } } } @@ -173,15 +160,34 @@ public class NotificationProcessor implements NettyRequestProcessor { return response; } - private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) { + private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader) { + boolean hasMsg; + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName); + return hasMsgFromTopic(topicConfig, randomQ, requestHeader); + } + + private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader) { + boolean hasMsg; + if (topicConfig != null) { + for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); + hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId); + if (hasMsg) { + return true; + } + } + } + return false; + } + + private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId) { if (Boolean.TRUE.equals(requestHeader.getOrder())) { if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) { return false; } } - String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic(); - long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId); - long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset; + long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId); + long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset; return restNum > 0; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java index e1e0e13e53..a72759883c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -133,8 +134,10 @@ public class PeekMessageProcessor implements NettyRequestProcessor { boolean needRetry = randomQ % 5 == 0; long popTime = System.currentTimeMillis(); long restNum = 0; + BrokerConfig brokerConfig = brokerController.getBrokerConfig(); if (needRetry) { - TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager() + .selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2())); if (retryTopicConfig != null) { for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); @@ -154,7 +157,8 @@ public class PeekMessageProcessor implements NettyRequestProcessor { } // if not full , fetch retry again if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums()) { - TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + TopicConfig retryTopicConfig = this.brokerController.getTopicConfigManager() + .selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2())); if (retryTopicConfig != null) { for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); @@ -226,7 +230,9 @@ public class PeekMessageProcessor implements NettyRequestProcessor { private long peekMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult, PeekMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime) { - String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic(); + String topic = isRetry ? + KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerController.getBrokerConfig().isEnableRetryTopicV2()) + : requestHeader.getTopic(); GetMessageResult getMessageTmpResult; long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId); restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 5d86ecc0cd..02e266a786 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.broker.longpolling.PopLongPollingService; import org.apache.rocketmq.broker.longpolling.PopRequest; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PopAckConstants; @@ -289,6 +290,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { return response; } + BrokerConfig brokerConfig = brokerController.getBrokerConfig(); ExpressionMessageFilter messageFilter = null; if (requestHeader.getExp() != null && requestHeader.getExp().length() > 0) { try { @@ -296,7 +298,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData); - String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType()); brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), retryTopic, retrySubscriptionData); @@ -330,7 +332,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData); - String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, "*", ExpressionType.TAG); brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), retryTopic, retrySubscriptionData); @@ -359,47 +361,26 @@ public class PopMessageProcessor implements NettyRequestProcessor { // only one type of retry topic is able to call popMsgFromQueue. boolean needRetry = randomQ % 5 == 0; boolean needRetryV1 = false; - if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { + if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { needRetryV1 = randomQ % 2 == 0; } long popTime = System.currentTimeMillis(); CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L); if (needRetry && !requestHeader.isOrder()) { if (needRetryV1) { - TopicConfig retryTopicConfigV1 = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfigV1 != null) { - for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> - popMsgFromQueue(retryTopicConfigV1.getTopicName(), requestHeader.getAttemptId(), true, - getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } - } + String retryTopic = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel, + popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } else { - TopicConfig retryTopicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> - popMsgFromQueue(retryTopicConfig.getTopicName(), requestHeader.getAttemptId(), true, - getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } - } + String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); + getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel, + popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } } if (requestHeader.getQueueId() < 0) { // read all queue - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> - popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false, - getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } + getMessageFuture = popMsgFromTopic(topicConfig, false, getMessageResult, requestHeader, reviveQid, channel, + popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } else { int queueId = requestHeader.getQueueId(); getMessageFuture = getMessageFuture.thenCompose(restNum -> @@ -410,29 +391,13 @@ public class PopMessageProcessor implements NettyRequestProcessor { // if not full , fetch retry again if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) { if (needRetryV1) { - TopicConfig retryTopicConfigV1 = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfigV1 != null) { - for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> - popMsgFromQueue(retryTopicConfigV1.getTopicName(), requestHeader.getAttemptId(), true, - getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } - } + String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + getMessageFuture = popMsgFromTopic(retryTopicV1, true, getMessageResult, requestHeader, reviveQid, channel, + popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } else { - TopicConfig retryTopicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> - popMsgFromQueue(retryTopicConfig.getTopicName(), requestHeader.getAttemptId(), true, - getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } - } + String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); + getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel, + popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } } @@ -513,11 +478,35 @@ public class PopMessageProcessor implements NettyRequestProcessor { return null; } - private CompletableFuture<Long> popMsgFromQueue(String targetTopic, String attemptId, boolean isRetry, GetMessageResult getMessageResult, + private CompletableFuture<Long> popMsgFromTopic(TopicConfig topicConfig, boolean isRetry, GetMessageResult getMessageResult, + PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, long popTime, + ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, + StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) { + if (topicConfig != null) { + for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), isRetry, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter, + startOffsetInfo, msgOffsetInfo, orderCountInfo)); + } + } + return getMessageFuture; + } + + private CompletableFuture<Long> popMsgFromTopic(String topic, boolean isRetry, GetMessageResult getMessageResult, + PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, long popTime, + ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, + StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + return popMsgFromTopic(topicConfig, isRetry, getMessageResult, requestHeader, reviveQid, channel, popTime, + messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); + } + + private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId, boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { - String topic = targetTopic; String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; boolean isOrder = requestHeader.isOrder(); @@ -618,8 +607,8 @@ public class PopMessageProcessor implements NettyRequestProcessor { return atomicRestNum.get() + result.getMessageCount(); } } - ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, finalOffset); - ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId, + ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, queueId, finalOffset); + ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, queueId, result.getMessageQueueOffset()); } else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(result.getStatus()) || GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus()) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 8d25bc57e1..104b78d441 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -103,7 +103,7 @@ public class PopReviveService extends ServiceThread { private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId())); + msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId(), brokerController.getBrokerConfig().isEnableRetryTopicV2())); } else { msgInner.setTopic(popCheckPoint.getTopic()); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index c6b889baee..e4fcc690d1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -278,15 +278,16 @@ public class AdminBrokerProcessorTest { public void testDeleteWithPopRetryTopic() throws Exception { String topic = "topicA"; String anotherTopic = "another_topicA"; + BrokerConfig brokerConfig = new BrokerConfig(); topicConfigManager = mock(TopicConfigManager.class); when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); topicConfigTable.put(topic, new TopicConfig()); - topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1", brokerConfig.isEnableRetryTopicV2()), new TopicConfig()); topicConfigTable.put(anotherTopic, new TopicConfig()); - topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2", brokerConfig.isEnableRetryTopicV2()), new TopicConfig()); when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> { final String selectTopic = invocation.getArgument(0); @@ -301,7 +302,7 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); verify(topicConfigManager).deleteTopicConfig(topic); - verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1")); + verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1", brokerConfig.isEnableRetryTopicV2())); verify(messageStore, times(2)).deleteTopics(anySet()); } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index cf399802ba..9f1a71c92f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -446,10 +446,10 @@ public class MQClientAPIImplTest { responseHeader.setReviveQid(0); responseHeader.setRestNum(1); StringBuilder startOffsetInfo = new StringBuilder(64); - ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 0L); + ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L); responseHeader.setStartOffsetInfo(startOffsetInfo.toString()); StringBuilder msgOffsetInfo = new StringBuilder(64); - ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, Collections.singletonList(0L)); + ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, Collections.singletonList(0L)); responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString()); response.setRemark("FOUND"); response.makeCustomHeaderToNet(); @@ -515,10 +515,10 @@ public class MQClientAPIImplTest { responseHeader.setReviveQid(0); responseHeader.setRestNum(1); StringBuilder startOffsetInfo = new StringBuilder(64); - ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 0L); + ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L); responseHeader.setStartOffsetInfo(startOffsetInfo.toString()); StringBuilder msgOffsetInfo = new StringBuilder(64); - ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, Collections.singletonList(0L)); + ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, Collections.singletonList(0L)); responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString()); response.setRemark("FOUND"); response.makeCustomHeaderToNet(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index a4a553ad5b..bedc7f386b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -225,6 +225,7 @@ public class BrokerConfig extends BrokerIdentity { private boolean initPopOffsetByCheckMsgInMem = true; // read message from pop retry topic v1, for the compatibility, will be removed in the future version private boolean retrieveMessageFromPopRetryTopicV1 = true; + private boolean enableRetryTopicV2 = false; private boolean realTimeNotifyConsumerChange = true; @@ -1309,6 +1310,14 @@ public class BrokerConfig extends BrokerIdentity { this.retrieveMessageFromPopRetryTopicV1 = retrieveMessageFromPopRetryTopicV1; } + public boolean isEnableRetryTopicV2() { + return enableRetryTopicV2; + } + + public void setEnableRetryTopicV2(boolean enableRetryTopicV2) { + this.enableRetryTopicV2 = enableRetryTopicV2; + } + public boolean isRealTimeNotifyConsumerChange() { return realTimeNotifyConsumerChange; } diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java index 0f77c96ab0..910a73b713 100644 --- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java @@ -22,7 +22,18 @@ public class KeyBuilder { private static final char POP_RETRY_SEPARATOR_V2 = '+'; private static final String POP_RETRY_REGEX_SEPARATOR_V2 = "\\+"; + public static String buildPopRetryTopic(String topic, String cid, boolean enableRetryV2) { + if (enableRetryV2) { + return buildPopRetryTopicV2(topic, cid); + } + return buildPopRetryTopicV1(topic, cid); + } + public static String buildPopRetryTopic(String topic, String cid) { + return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V1 + topic; + } + + public static String buildPopRetryTopicV2(String topic, String cid) { return MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + POP_RETRY_SEPARATOR_V2 + topic; } diff --git a/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java b/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java index 392a3ae339..daaaee5600 100644 --- a/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java +++ b/common/src/main/java/org/apache/rocketmq/common/consumer/ReceiptHandle.java @@ -26,6 +26,8 @@ public class ReceiptHandle { private static final String SEPARATOR = MessageConst.KEY_SEPARATOR; public static final String NORMAL_TOPIC = "0"; public static final String RETRY_TOPIC = "1"; + + public static final String RETRY_TOPIC_V2 = "2"; private final long startOffset; private final long retrieveTime; private final long invisibleTime; @@ -220,12 +222,15 @@ public class ReceiptHandle { } public boolean isRetryTopic() { - return RETRY_TOPIC.equals(topicType); + return RETRY_TOPIC.equals(topicType) || RETRY_TOPIC_V2.equals(topicType); } public String getRealTopic(String topic, String groupName) { - if (isRetryTopic()) { - return KeyBuilder.buildPopRetryTopic(topic, groupName); + if (RETRY_TOPIC.equals(topicType)) { + return KeyBuilder.buildPopRetryTopicV1(topic, groupName); + } + if (RETRY_TOPIC_V2.equals(topicType)) { + return KeyBuilder.buildPopRetryTopicV2(topic, groupName); } return topic; } diff --git a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java index 3c75871eaf..47191c907f 100644 --- a/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/KeyBuilderTest.java @@ -27,7 +27,7 @@ public class KeyBuilderTest { @Test public void testBuildPopRetryTopic() { - assertThat(KeyBuilder.buildPopRetryTopic(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "+" + topic); + assertThat(KeyBuilder.buildPopRetryTopicV2(topic, group)).isEqualTo(MixAll.RETRY_GROUP_TOPIC_PREFIX + group + "+" + topic); } @Test @@ -37,25 +37,25 @@ public class KeyBuilderTest { @Test public void testParseNormalTopic() { - String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + String popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group); assertThat(KeyBuilder.parseNormalTopic(popRetryTopic, group)).isEqualTo(topic); String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); assertThat(KeyBuilder.parseNormalTopic(popRetryTopicV1, group)).isEqualTo(topic); - popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group); assertThat(KeyBuilder.parseNormalTopic(popRetryTopic)).isEqualTo(topic); } @Test public void testParseGroup() { - String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + String popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group); assertThat(KeyBuilder.parseGroup(popRetryTopic)).isEqualTo(group); } @Test public void testIsPopRetryTopicV2() { - String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + String popRetryTopic = KeyBuilder.buildPopRetryTopicV2(topic, group); assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopic)).isEqualTo(true); String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); assertThat(KeyBuilder.isPopRetryTopicV2(popRetryTopicV1)).isEqualTo(false); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java index db268a06e6..f154033e45 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PopStatus; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.ConsumeInitMode; @@ -169,7 +170,7 @@ public class ConsumerProcessorTest extends BaseProcessorTest { CONSUMER_GROUP, TOPIC, 3000).get(); assertEquals(AckStatus.OK, ackResult.getStatus()); - assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP), requestHeaderArgumentCaptor.getValue().getTopic()); + assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new BrokerConfig().isEnableRetryTopicV2()), requestHeaderArgumentCaptor.getValue().getTopic()); assertEquals(CONSUMER_GROUP, requestHeaderArgumentCaptor.getValue().getConsumerGroup()); assertEquals(handle.getReceiptHandle(), requestHeaderArgumentCaptor.getValue().getExtraInfo()); } @@ -292,7 +293,7 @@ public class ConsumerProcessorTest extends BaseProcessorTest { CONSUMER_GROUP, TOPIC, 1000, 3000).get(); assertEquals(AckStatus.OK, ackResult.getStatus()); - assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP), requestHeaderArgumentCaptor.getValue().getTopic()); + assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new BrokerConfig().isEnableRetryTopicV2()), requestHeaderArgumentCaptor.getValue().getTopic()); assertEquals(CONSUMER_GROUP, requestHeaderArgumentCaptor.getValue().getConsumerGroup()); assertEquals(1000, requestHeaderArgumentCaptor.getValue().getInvisibleTime().longValue()); assertEquals(handle.getReceiptHandle(), requestHeaderArgumentCaptor.getValue().getExtraInfo()); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java index de63b7e75f..af61c80315 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.attribute.TopicMessageType; @@ -185,7 +186,7 @@ public class ProducerProcessorTest extends BaseProcessorTest { when(this.messageService.sendMessageBack(any(), any(), anyString(), requestHeaderArgumentCaptor.capture(), anyLong())) .thenReturn(CompletableFuture.completedFuture(mock(RemotingCommand.class))); - MessageExt messageExt = createMessageExt(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP), "", 16, 3000); + MessageExt messageExt = createMessageExt(KeyBuilder.buildPopRetryTopic(TOPIC, CONSUMER_GROUP, new BrokerConfig().isEnableRetryTopicV2()), "", 16, 3000); RemotingCommand remotingCommand = this.producerProcessor.forwardMessageToDeadLetterQueue( createContext(), create(messageExt), diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java index 51fea167d0..e959244dec 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java @@ -286,8 +286,8 @@ public class LocalMessageServiceTest extends InitConfigTest { MessageExt message2 = buildMessageExt(topic, 0, startOffset + 1); messageExtList.add(message2); messageOffsetList.add(startOffset + 1); - ExtraInfoUtil.buildStartOffsetInfo(startOffsetStringBuilder, false, queueId, startOffset); - ExtraInfoUtil.buildMsgOffsetInfo(messageOffsetStringBuilder, false, queueId, messageOffsetList); + ExtraInfoUtil.buildStartOffsetInfo(startOffsetStringBuilder, topic, queueId, startOffset); + ExtraInfoUtil.buildMsgOffsetInfo(messageOffsetStringBuilder, topic, queueId, messageOffsetList); byte[] body2 = MessageDecoder.encode(message2, false); ByteBuffer byteBuffer1 = ByteBuffer.wrap(body1); ByteBuffer byteBuffer2 = ByteBuffer.wrap(body2); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java index 13094331e6..a6a4a77767 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java @@ -26,7 +26,8 @@ import org.apache.rocketmq.common.message.MessageConst; public class ExtraInfoUtil { private static final String NORMAL_TOPIC = "0"; - public static final String RETRY_TOPIC = "1"; + private static final String RETRY_TOPIC = "1"; + private static final String RETRY_TOPIC_V2 = "2"; private static final String QUEUE_OFFSET = "qo"; public static String[] split(String extraInfo) { @@ -69,14 +70,24 @@ public class ExtraInfoUtil { throw new IllegalArgumentException("getRealTopic fail, extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length)); } if (RETRY_TOPIC.equals(extraInfoStrs[4])) { - return KeyBuilder.buildPopRetryTopic(topic, cid); + return KeyBuilder.buildPopRetryTopicV1(topic, cid); + } else if (RETRY_TOPIC_V2.equals(extraInfoStrs[4])) { + return KeyBuilder.buildPopRetryTopicV2(topic, cid); } else { return topic; } } - public static String getRealTopic(String topic, String cid, boolean isRetry) { - return isRetry ? KeyBuilder.buildPopRetryTopic(topic, cid) : topic; + public static String getRealTopic(String topic, String cid, String retry) { + if (retry.equals(NORMAL_TOPIC)) { + return topic; + } else if (retry.equals(RETRY_TOPIC)) { + return KeyBuilder.buildPopRetryTopicV1(topic, cid); + } else if (retry.equals(RETRY_TOPIC_V2)) { + return KeyBuilder.buildPopRetryTopicV2(topic, cid); + } else { + throw new IllegalArgumentException("getRetry fail, format is wrong"); + } } public static String getRetry(String[] extraInfoStrs) { @@ -108,20 +119,14 @@ public class ExtraInfoUtil { } public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId) { - String t = NORMAL_TOPIC; - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - t = RETRY_TOPIC; - } + String t = getRetry(topic); return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t + MessageConst.KEY_SEPARATOR + brokerName + MessageConst.KEY_SEPARATOR + queueId; } public static String buildExtraInfo(long ckQueueOffset, long popTime, long invisibleTime, int reviveQid, String topic, String brokerName, int queueId, long msgQueueOffset) { - String t = NORMAL_TOPIC; - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - t = RETRY_TOPIC; - } + String t = getRetry(topic); return ckQueueOffset + MessageConst.KEY_SEPARATOR + popTime + MessageConst.KEY_SEPARATOR + invisibleTime + MessageConst.KEY_SEPARATOR + reviveQid + MessageConst.KEY_SEPARATOR + t @@ -129,7 +134,7 @@ public class ExtraInfoUtil { + MessageConst.KEY_SEPARATOR + msgQueueOffset; } - public static void buildStartOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, long startOffset) { + public static void buildStartOffsetInfo(StringBuilder stringBuilder, String topic, int queueId, long startOffset) { if (stringBuilder == null) { stringBuilder = new StringBuilder(64); } @@ -138,12 +143,12 @@ public class ExtraInfoUtil { stringBuilder.append(";"); } - stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + stringBuilder.append(getRetry(topic)) .append(MessageConst.KEY_SEPARATOR).append(queueId) .append(MessageConst.KEY_SEPARATOR).append(startOffset); } - public static void buildQueueIdOrderCountInfo(StringBuilder stringBuilder, boolean retry, int queueId, int orderCount) { + public static void buildQueueIdOrderCountInfo(StringBuilder stringBuilder, String topic, int queueId, int orderCount) { if (stringBuilder == null) { stringBuilder = new StringBuilder(64); } @@ -152,12 +157,12 @@ public class ExtraInfoUtil { stringBuilder.append(";"); } - stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + stringBuilder.append(getRetry(topic)) .append(MessageConst.KEY_SEPARATOR).append(queueId) .append(MessageConst.KEY_SEPARATOR).append(orderCount); } - public static void buildQueueOffsetOrderCountInfo(StringBuilder stringBuilder, boolean retry, long queueId, long queueOffset, int orderCount) { + public static void buildQueueOffsetOrderCountInfo(StringBuilder stringBuilder, String topic, long queueId, long queueOffset, int orderCount) { if (stringBuilder == null) { stringBuilder = new StringBuilder(64); } @@ -166,12 +171,12 @@ public class ExtraInfoUtil { stringBuilder.append(";"); } - stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + stringBuilder.append(getRetry(topic)) .append(MessageConst.KEY_SEPARATOR).append(getQueueOffsetKeyValueKey(queueId, queueOffset)) .append(MessageConst.KEY_SEPARATOR).append(orderCount); } - public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean retry, int queueId, List<Long> msgOffsets) { + public static void buildMsgOffsetInfo(StringBuilder stringBuilder, String topic, int queueId, List<Long> msgOffsets) { if (stringBuilder == null) { stringBuilder = new StringBuilder(64); } @@ -180,7 +185,7 @@ public class ExtraInfoUtil { stringBuilder.append(";"); } - stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC) + stringBuilder.append(getRetry(topic)) .append(MessageConst.KEY_SEPARATOR).append(queueId) .append(MessageConst.KEY_SEPARATOR); @@ -279,11 +284,11 @@ public class ExtraInfoUtil { } public static String getStartOffsetInfoMapKey(String topic, long key) { - return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key; + return getRetry(topic) + "@" + key; } public static String getStartOffsetInfoMapKey(String topic, String popCk, long key) { - return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key; + return getRetry(topic, popCk) + "@" + key; } public static String getQueueOffsetKeyValueKey(long queueId, long queueOffset) { @@ -291,10 +296,27 @@ public class ExtraInfoUtil { } public static String getQueueOffsetMapKey(String topic, long queueId, long queueOffset) { - return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + getQueueOffsetKeyValueKey(queueId, queueOffset); + return getRetry(topic) + "@" + getQueueOffsetKeyValueKey(queueId, queueOffset); } public static boolean isOrder(String[] extraInfo) { return ExtraInfoUtil.getReviveQid(extraInfo) == KeyBuilder.POP_ORDER_REVIVE_QUEUE; } + + private static String getRetry(String topic) { + String t = NORMAL_TOPIC; + if (KeyBuilder.isPopRetryTopicV2(topic)) { + t = RETRY_TOPIC_V2; + } else if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + t = RETRY_TOPIC; + } + return t; + } + + private static String getRetry(String topic, String popCk) { + if (popCk != null) { + return getRetry(split(popCk)); + } + return getRetry(topic); + } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java index 7cf258711c..8081f386cb 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtilTest.java @@ -36,8 +36,8 @@ public class ExtraInfoUtilTest { String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, queueId, queueOffset); StringBuilder sb = new StringBuilder(); - ExtraInfoUtil.buildQueueIdOrderCountInfo(sb, false, queueId, queueIdCount); - ExtraInfoUtil.buildQueueOffsetOrderCountInfo(sb, false, queueId, queueOffset, queueOffsetCount); + ExtraInfoUtil.buildQueueIdOrderCountInfo(sb, topic, queueId, queueIdCount); + ExtraInfoUtil.buildQueueOffsetOrderCountInfo(sb, topic, queueId, queueOffset, queueOffsetCount); Map<String, Integer> orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(sb.toString()); assertEquals(queueIdCount, orderCountInfo.get(queueIdKey)); diff --git a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java index f20a748a6f..fe40e866a6 100644 --- a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerIdentity; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -60,6 +61,7 @@ public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { private static DefaultMQProducer producer; private final static String MESSAGE_STRING = RandomStringUtils.random(1024); private static final byte[] MESSAGE_BODY = MESSAGE_STRING.getBytes(StandardCharsets.UTF_8); + private final BrokerConfig brokerConfig = new BrokerConfig(); public PopSlaveActingMasterIT() { } @@ -87,7 +89,7 @@ public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { public void testLocalActing_ackSlave() throws Exception { String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); createTopic(topic); - String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2()); createTopic(retryTopic); this.switchPop(topic); @@ -151,7 +153,7 @@ public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { public void testLocalActing_notAckSlave() throws Exception { String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); createTopic(topic); - String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2()); createTopic(retryTopic); this.switchPop(topic); @@ -231,7 +233,7 @@ public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { public void testRemoteActing_ackSlave() throws Exception { String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); createTopic(topic); - String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2()); createTopic(retryTopic); switchPop(topic); @@ -312,7 +314,7 @@ public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { createTopic(topic); this.switchPop(topic); - String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2()); createTopic(retryTopic); producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); @@ -399,7 +401,7 @@ public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); createTopic(topic); this.switchPop(topic); - String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP, brokerConfig.isEnableRetryTopicV2()); createTopic(retryTopic); producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);