This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 71a7a659be [ISSUE #7543] only call a single type of retry topic in pop (#7665) 71a7a659be is described below commit 71a7a659bed15110d1146091bfb7a51d28ade562 Author: Zhouxiang Zhan <zhouxz...@apache.org> AuthorDate: Fri Dec 15 16:09:11 2023 +0800 [ISSUE #7543] only call a single type of retry topic in pop (#7665) * only call a single type of retry topic in pop --- .../broker/processor/PopMessageProcessor.java | 85 ++++++++++++++-------- .../broker/processor/PopMessageProcessorTest.java | 2 +- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 58baecc05a..5d86ecc0cd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -351,27 +351,42 @@ public class PopMessageProcessor implements NettyRequestProcessor { ExpressionMessageFilter finalMessageFilter = messageFilter; StringBuilder finalOrderCountInfo = orderCountInfo; + // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and orderCountInfo, + // a single POP request could only invoke the popMsgFromQueue method once + // for either a normal topic or a retry topic's queue. Retry topics v1 and v2 are + // considered the same type because they share the same retry flag in previous fields. + // Therefore, needRetryV1 is designed as a subset of needRetry, and within a single request, + // only one type of retry topic is able to call popMsgFromQueue. boolean needRetry = randomQ % 5 == 0; + boolean needRetryV1 = false; + if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { + needRetryV1 = randomQ % 2 == 0; + } long popTime = System.currentTimeMillis(); CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L); if (needRetry && !requestHeader.isOrder()) { - TopicConfig retryTopicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } - } - if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { + if (needRetryV1) { TopicConfig retryTopicConfigV1 = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); if (retryTopicConfigV1 != null) { for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(retryTopicConfigV1.getTopicName(), requestHeader.getAttemptId(), true, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + } + } + } else { + TopicConfig retryTopicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + if (retryTopicConfig != null) { + for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(retryTopicConfig.getTopicName(), requestHeader.getAttemptId(), true, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } } @@ -380,33 +395,42 @@ public class PopMessageProcessor implements NettyRequestProcessor { // read all queue for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } else { int queueId = requestHeader.getQueueId(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } // if not full , fetch retry again if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) { - TopicConfig retryTopicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); - if (retryTopicConfig != null) { - for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); - } - } - if (brokerController.getBrokerConfig().isRetrieveMessageFromPopRetryTopicV1()) { + if (needRetryV1) { TopicConfig retryTopicConfigV1 = this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup())); if (retryTopicConfigV1 != null) { for (int i = 0; i < retryTopicConfigV1.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfigV1.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(retryTopicConfigV1.getTopicName(), requestHeader.getAttemptId(), true, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + } + } + } else { + TopicConfig retryTopicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup())); + if (retryTopicConfig != null) { + for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { + int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); + getMessageFuture = getMessageFuture.thenCompose(restNum -> + popMsgFromQueue(retryTopicConfig.getTopicName(), requestHeader.getAttemptId(), true, + getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } } @@ -489,12 +513,11 @@ public class PopMessageProcessor implements NettyRequestProcessor { return null; } - private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean isRetry, GetMessageResult getMessageResult, + private CompletableFuture<Long> popMsgFromQueue(String targetTopic, String attemptId, boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { - String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), - requestHeader.getConsumerGroup()) : requestHeader.getTopic(); + String topic = targetTopic; String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; boolean isOrder = requestHeader.isOrder(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java index 44f04066ca..d8c8fa1034 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java @@ -76,7 +76,7 @@ public class PopMessageProcessorTest { brokerController.getBrokerConfig().setEnablePopBufferMerge(true); popMessageProcessor = new PopMessageProcessor(brokerController); when(handlerContext.channel()).thenReturn(embeddedChannel); - brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); + brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig(topic)); clientChannelInfo = new ClientChannelInfo(embeddedChannel); ConsumerData consumerData = createConsumerData(group, topic); brokerController.getConsumerManager().registerConsumer(