This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aab646ce83 [ISSUE #8460] Improve the pop revive process when reading 
biz messages from a remote broker (#8475)
aab646ce83 is described below

commit aab646ce83930454d8b5956779aa28a80e326e24
Author: imzs <i...@foxmail.com>
AuthorDate: Fri Aug 2 14:43:21 2024 +0800

    [ISSUE #8460] Improve the pop revive process when reading biz messages from 
a remote broker (#8475)
    
    * [ISSUE #8460] part1: add extra information to the call chain of remote 
message reading
    
    * [ISSUE #8460] part2: add exponential backoff and ending condition of CK 
rewrite, and fix checkstyle
    
    * [ISSUE #8460] exclude test, BrokerOuterAPITest passed locally, but failed 
on bazel.
---
 broker/BUILD.bazel                                 |   1 +
 .../rocketmq/broker/failover/EscapeBridge.java     |  44 +++---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  13 +-
 .../broker/processor/PopReviveService.java         |  41 +++--
 .../apache/rocketmq/broker/BrokerOuterAPITest.java | 173 ++++++++++++++++++++-
 .../rocketmq/broker/failover/EscapeBridgeTest.java | 150 +++++++++++++++++-
 .../broker/processor/PopReviveServiceTest.java     | 166 +++++++++++++++++++-
 .../apache/rocketmq/store/pop/PopCheckPoint.java   |  23 ++-
 8 files changed, 554 insertions(+), 57 deletions(-)

diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 0dbc85f945..66e621e930 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -100,6 +100,7 @@ GenTestRules(
     exclude_tests = [
             # These tests are extremely slow and flaky, exclude them before 
they are properly fixed.
             
"src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest",
+            "src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest",
         ],
     deps = [
         ":tests",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java 
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index ededaf2c65..7df49f8c47 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -25,7 +25,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -34,7 +36,6 @@ import 
org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -47,7 +48,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
@@ -263,34 +263,29 @@ public class EscapeBridge {
         }
     }
 
-    public Pair<GetMessageStatus, MessageExt> getMessage(String topic, long 
offset, int queueId, String brokerName, boolean deCompressBody) {
+    public Triple<MessageExt, String, Boolean> getMessage(String topic, long 
offset, int queueId, String brokerName, boolean deCompressBody) {
         return getMessageAsync(topic, offset, queueId, brokerName, 
deCompressBody).join();
     }
 
-    public CompletableFuture<Pair<GetMessageStatus, MessageExt>> 
getMessageAsync(String topic, long offset, int queueId, String brokerName, 
boolean deCompressBody) {
+    // Triple<MessageExt, info, needRetry>, check info and retry if and only 
if MessageExt is null
+    public CompletableFuture<Triple<MessageExt, String, Boolean>> 
getMessageAsync(String topic, long offset, int queueId, String brokerName, 
boolean deCompressBody) {
         MessageStore messageStore = 
brokerController.getMessageStoreByBrokerName(brokerName);
         if (messageStore != null) {
             return messageStore.getMessageAsync(innerConsumerGroupName, topic, 
queueId, offset, 1, null)
                 .thenApply(result -> {
                     if (result == null) {
                         LOG.warn("getMessageResult is null , 
innerConsumerGroupName {}, topic {}, offset {}, queueId {}", 
innerConsumerGroupName, topic, offset, queueId);
-                        return new 
Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null);
+                        return Triple.of(null, "getMessageResult is null", 
false); // local store, so no retry
                     }
                     List<MessageExt> list = decodeMsgList(result, 
deCompressBody);
                     if (list == null || list.isEmpty()) {
                         LOG.warn("Can not get msg , topic {}, offset {}, 
queueId {}, result is {}", topic, offset, queueId, result);
-                        return new Pair<>(result.getStatus(), null);
+                        return Triple.of(null, "Can not get msg", false); // 
local store, so no retry
                     }
-                    return new Pair<>(result.getStatus(), list.get(0));
+                    return Triple.of(list.get(0), "", false);
                 });
         } else {
-            return getMessageFromRemoteAsync(topic, offset, queueId, 
brokerName)
-                .thenApply(msg -> {
-                    if (msg == null) {
-                        return new 
Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null);
-                    }
-                    return new Pair<>(GetMessageStatus.FOUND, msg);
-                });
+            return getMessageFromRemoteAsync(topic, offset, queueId, 
brokerName);
         }
     }
 
@@ -322,11 +317,12 @@ public class EscapeBridge {
         return foundList;
     }
 
-    protected MessageExt getMessageFromRemote(String topic, long offset, int 
queueId, String brokerName) {
+    protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String 
topic, long offset, int queueId, String brokerName) {
         return getMessageFromRemoteAsync(topic, offset, queueId, 
brokerName).join();
     }
 
-    protected CompletableFuture<MessageExt> getMessageFromRemoteAsync(String 
topic, long offset, int queueId, String brokerName) {
+    // Triple<MessageExt, info, needRetry>, check info and retry if and only 
if MessageExt is null
+    protected CompletableFuture<Triple<MessageExt, String, Boolean>> 
getMessageFromRemoteAsync(String topic, long offset, int queueId, String 
brokerName) {
         try {
             String brokerAddr = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
 MixAll.MASTER_ID, false);
             if (null == brokerAddr) {
@@ -334,23 +330,25 @@ public class EscapeBridge {
                 brokerAddr = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
 MixAll.MASTER_ID, false);
 
                 if (null == brokerAddr) {
-                    LOG.warn("can't find broker address for topic {}", topic);
-                    return CompletableFuture.completedFuture(null);
+                    LOG.warn("can't find broker address for topic {}, {}", 
topic, brokerName);
+                    return CompletableFuture.completedFuture(Triple.of(null, 
"brokerAddress not found", true)); // maybe offline temporarily, so need retry
                 }
             }
 
             return 
this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName,
                 brokerAddr, this.innerConsumerGroupName, topic, queueId, 
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
                 .thenApply(pullResult -> {
-                    if (pullResult.getPullStatus().equals(PullStatus.FOUND) && 
!pullResult.getMsgFoundList().isEmpty()) {
-                        return pullResult.getMsgFoundList().get(0);
+                    if (pullResult.getLeft() != null
+                            && 
PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
+                            && 
CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
+                        return 
Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false);
                     }
-                    return null;
+                    return Triple.of(null, pullResult.getMiddle(), 
pullResult.getRight());
                 });
         } catch (Exception e) {
-            LOG.error("Get message from remote failed.", e);
+            LOG.error("Get message from remote failed. {}, {}, {}, {}", topic, 
offset, queueId, brokerName, e);
         }
 
-        return CompletableFuture.completedFuture(null);
+        return CompletableFuture.completedFuture(Triple.of(null, "Get message 
from remote failed", true)); // need retry
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index d5c80ce2ec..83edd88408 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.auth.config.AuthConfig;
@@ -1378,7 +1379,8 @@ public class BrokerOuterAPI {
         });
     }
 
-    public CompletableFuture<PullResult> 
pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+    // Triple<PullResult, info, needRetry>, should check info and retry if and 
only if PullResult is null
+    public CompletableFuture<Triple<PullResult, String, Boolean>> 
pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
         String consumerGroup, String topic, int queueId, long offset,
         int maxNums, long timeoutMillis) throws RemotingException, 
InterruptedException {
         PullMessageRequestHeader requestHeader = new 
PullMessageRequestHeader();
@@ -1397,7 +1399,7 @@ public class BrokerOuterAPI {
         requestHeader.setBrokerName(brokerName);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
-        CompletableFuture<PullResult> pullResultFuture = new 
CompletableFuture<>();
+        CompletableFuture<Triple<PullResult, String, Boolean>> 
pullResultFuture = new CompletableFuture<>();
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, 
new InvokeCallback() {
             @Override
             public void operationComplete(ResponseFuture responseFuture) {
@@ -1409,15 +1411,16 @@ public class BrokerOuterAPI {
                 try {
                     PullResultExt pullResultExt = 
processPullResponse(response, brokerAddr);
                     processPullResult(pullResultExt, brokerName, queueId);
-                    pullResultFuture.complete(pullResultExt);
+                    pullResultFuture.complete(Triple.of(pullResultExt, 
pullResultExt.getPullStatus().name(), false)); // found or not found really, so 
no retry
                 } catch (Exception e) {
-                    pullResultFuture.complete(new 
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
+                    // retry when NO_PERMISSION, SUBSCRIPTION_GROUP_NOT_EXIST 
etc. even when TOPIC_NOT_EXIST
+                    pullResultFuture.complete(Triple.of(null, "Response Code:" 
+ response.getCode(), true));
                 }
             }
 
             @Override
             public void operationFail(Throwable throwable) {
-                pullResultFuture.complete(new 
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
+                pullResultFuture.complete(Triple.of(null, 
throwable.getMessage(), true));
             }
         });
         return pullResultFuture;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 8074af23bf..114d094600 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
@@ -34,6 +35,7 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
@@ -51,7 +53,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
@@ -63,6 +64,7 @@ import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOP
 
 public class PopReviveService extends ServiceThread {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+    private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 
60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };
 
     private int queueId;
     private BrokerController brokerController;
@@ -196,7 +198,8 @@ public class PopReviveService extends ServiceThread {
             || pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && 
offset == pullResult.getMaxOffset();
     }
 
-    private CompletableFuture<Pair<GetMessageStatus, MessageExt>> 
getBizMessage(String topic, long offset, int queueId,
+    // Triple<MessageExt, info, needRetry>
+    private CompletableFuture<Triple<MessageExt, String, Boolean>> 
getBizMessage(String topic, long offset, int queueId,
         String brokerName) {
         return this.brokerController.getEscapeBridge().getMessageAsync(topic, 
offset, queueId, brokerName, false);
     }
@@ -491,6 +494,8 @@ public class PopReviveService extends ServiceThread {
                     PopCheckPoint oldCK = inflightReviveRequestMap.firstKey();
                     rePutCK(oldCK, pair);
                     inflightReviveRequestMap.remove(oldCK);
+                    POP_LOGGER.warn("stay too long, remove from 
reviveRequestMap, {}, {}, {}, {}", popCheckPoint.getTopic(),
+                            popCheckPoint.getBrokerName(), 
popCheckPoint.getQueueId(), popCheckPoint.getStartOffset());
                 }
             }
 
@@ -524,22 +529,12 @@ public class PopReviveService extends ServiceThread {
             // retry msg
             long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
             CompletableFuture<Pair<Long, Boolean>> future = 
getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), 
popCheckPoint.getBrokerName())
-                .thenApply(resultPair -> {
-                    GetMessageStatus getMessageStatus = 
resultPair.getObject1();
-                    MessageExt message = resultPair.getObject2();
+                .thenApply(rst -> {
+                    MessageExt message = rst.getLeft();
                     if (message == null) {
-                        POP_LOGGER.debug("reviveQueueId={}, can not get biz 
msg topic is {}, offset is {}, then continue",
-                            queueId, popCheckPoint.getTopic(), msgOffset);
-                        switch (getMessageStatus) {
-                            case MESSAGE_WAS_REMOVING:
-                            case OFFSET_TOO_SMALL:
-                            case NO_MATCHED_LOGIC_QUEUE:
-                            case NO_MESSAGE_IN_QUEUE:
-                                return new Pair<>(msgOffset, true);
-                            default:
-                                return new Pair<>(msgOffset, false);
-
-                        }
+                        POP_LOGGER.info("reviveQueueId={}, can not get biz 
msg, topic:{}, qid:{}, offset:{}, brokerName:{}, info:{}, retry:{}, then 
continue",
+                            queueId, popCheckPoint.getTopic(), 
popCheckPoint.getQueueId(), msgOffset, popCheckPoint.getBrokerName(), 
UtilAll.frontStringAtLeast(rst.getMiddle(), 60), rst.getRight());
+                        return new Pair<>(msgOffset, !rst.getRight()); // 
Pair.object2 means OK or not, Triple.right value means needRetry
                     }
                     boolean result = reviveRetry(popCheckPoint, message);
                     return new Pair<>(msgOffset, result);
@@ -572,6 +567,13 @@ public class PopReviveService extends ServiceThread {
     }
 
     private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
+        int rePutTimes = oldCK.parseRePutTimes();
+        if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
+            POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, 
{}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
+                    oldCK.getBrokerName(), oldCK.getQueueId(), 
pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
+            return;
+        }
+
         PopCheckPoint newCk = new PopCheckPoint();
         newCk.setBitMap(0);
         newCk.setNum((byte) 1);
@@ -583,6 +585,11 @@ public class PopReviveService extends ServiceThread {
         newCk.setQueueId(oldCK.getQueueId());
         newCk.setBrokerName(oldCK.getBrokerName());
         newCk.addDiff(0);
+        newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always 
increment even if removed from reviveRequestMap
+        if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
+            // never expect an ACK matched in the future, we just use it to 
rewrite CK and try to revive retry message next time
+            newCk.setInvisibleTime(oldCK.getInvisibleTime() + 
ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
+        }
         MessageExtBrokerInner ckMsg = 
brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
         brokerController.getMessageStore().putMessage(ckMsg);
     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 8f89c14ae9..440ebf813b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -20,29 +20,43 @@ package org.apache.rocketmq.broker;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.auth.config.AuthConfig;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
@@ -56,9 +70,12 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Spy;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,9 +86,11 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.AdditionalMatchers.or;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(NettyRemotingClient.class)
 public class BrokerOuterAPITest {
     @Mock
     private ChannelHandlerContext handlerContext;
@@ -251,4 +270,154 @@ public class BrokerOuterAPITest {
         });
         Assert.assertTrue(result.get());
     }
+
+    @Test
+    public void testPullMessageFromSpecificBrokerAsync_createChannel_null() 
throws Exception {
+        NettyRemotingClient mockClient = PowerMockito.spy(new 
NettyRemotingClient(new NettyClientConfig()));
+        PowerMockito.when(mockClient, "getAndCreateChannelAsync", 
any()).thenReturn(null);
+        BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new 
AuthConfig());
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(api, mockClient);
+
+        Triple<PullResult, String, Boolean> rst = 
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertTrue(rst.getMiddle().contains("connect"));
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
+    @Test
+    public void 
testPullMessageFromSpecificBrokerAsync_createChannel_future_notSuccess() throws 
Exception {
+        NettyRemotingClient mockClient = PowerMockito.spy(new 
NettyRemotingClient(new NettyClientConfig()));
+        DefaultChannelPromise promise = PowerMockito.spy(new 
DefaultChannelPromise(PowerMockito.mock(Channel.class), new 
DefaultEventExecutor()));
+        PowerMockito.when(mockClient, "getAndCreateChannelAsync", 
any()).thenReturn(promise);
+        BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new 
AuthConfig());
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(api, mockClient);
+
+        promise.tryFailure(new Throwable());
+        Triple<PullResult, String, Boolean> rst
+                = api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 
1, 3000L).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertTrue(rst.getMiddle().contains("connect"));
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
+    // skip other future status test
+
+    @Test
+    public void testPullMessageFromSpecificBrokerAsync_timeout() throws 
Exception {
+        Channel channel = Mockito.mock(Channel.class);
+        when(channel.isActive()).thenReturn(true);
+        NettyRemotingClient mockClient = PowerMockito.spy(new 
NettyRemotingClient(new NettyClientConfig()));
+        DefaultChannelPromise promise = PowerMockito.spy(new 
DefaultChannelPromise(PowerMockito.mock(Channel.class), new 
DefaultEventExecutor()));
+        PowerMockito.when(mockClient, "getAndCreateChannelAsync", 
any()).thenReturn(promise);
+        when(promise.channel()).thenReturn(channel);
+        BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new 
AuthConfig());
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(api, mockClient);
+
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        doReturn(future).when(mockClient).invokeImpl(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+        promise.trySuccess(null);
+        future.completeExceptionally(new RemotingTimeoutException("wait 
response on the channel timeout"));
+        Triple<PullResult, String, Boolean> rst = 
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertTrue(rst.getMiddle().contains("timeout"));
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
+    @Test
+    public void 
testPullMessageFromSpecificBrokerAsync_brokerReturn_pullStatusCode() throws 
Exception {
+        Channel channel = Mockito.mock(Channel.class);
+        when(channel.isActive()).thenReturn(true);
+        NettyRemotingClient mockClient = PowerMockito.spy(new 
NettyRemotingClient(new NettyClientConfig()));
+        DefaultChannelPromise promise = PowerMockito.spy(new 
DefaultChannelPromise(PowerMockito.mock(Channel.class), new 
DefaultEventExecutor()));
+        PowerMockito.when(mockClient, "getAndCreateChannelAsync", 
any()).thenReturn(promise);
+        when(promise.channel()).thenReturn(channel);
+        BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new 
AuthConfig());
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(api, mockClient);
+
+        int[] respCodes = new int[] {ResponseCode.SUCCESS, 
ResponseCode.PULL_NOT_FOUND, ResponseCode.PULL_RETRY_IMMEDIATELY, 
ResponseCode.PULL_OFFSET_MOVED};
+        PullStatus[] respStatus = new PullStatus[] {PullStatus.FOUND, 
PullStatus.NO_NEW_MSG, PullStatus.NO_MATCHED_MSG, PullStatus.OFFSET_ILLEGAL};
+        for (int i = 0; i < respCodes.length; i++) {
+            CompletableFuture<ResponseFuture> future = new 
CompletableFuture<>();
+            doReturn(future).when(mockClient).invokeImpl(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+            RemotingCommand response = mockPullMessageResponse(respCodes[i]);
+            ResponseFuture responseFuture = new ResponseFuture(channel, 0, 
null, 1000,
+                    resp -> { }, new SemaphoreReleaseOnlyOnce(new 
Semaphore(1)));
+            responseFuture.setResponseCommand(response);
+            promise.trySuccess(null);
+            future.complete(responseFuture);
+
+            Triple<PullResult, String, Boolean> rst = 
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+            Assert.assertEquals(respStatus[i], rst.getLeft().getPullStatus());
+            if (ResponseCode.SUCCESS == respCodes[i]) {
+                Assert.assertEquals(1, rst.getLeft().getMsgFoundList().size());
+            } else {
+                Assert.assertNull(rst.getLeft().getMsgFoundList());
+            }
+            Assert.assertEquals(respStatus[i].name(), rst.getMiddle());
+            Assert.assertFalse(rst.getRight()); // no retry
+        }
+    }
+
+    @Test
+    public void 
testPullMessageFromSpecificBrokerAsync_brokerReturn_allOtherResponseCode() 
throws Exception {
+        Channel channel = Mockito.mock(Channel.class);
+        when(channel.isActive()).thenReturn(true);
+        NettyRemotingClient mockClient = PowerMockito.spy(new 
NettyRemotingClient(new NettyClientConfig()));
+        DefaultChannelPromise promise = PowerMockito.spy(new 
DefaultChannelPromise(PowerMockito.mock(Channel.class), new 
DefaultEventExecutor()));
+        PowerMockito.when(mockClient, "getAndCreateChannelAsync", 
any()).thenReturn(promise);
+        when(promise.channel()).thenReturn(channel);
+        BrokerOuterAPI api = new BrokerOuterAPI(new NettyClientConfig(), new 
AuthConfig());
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(api, mockClient);
+
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        doReturn(future).when(mockClient).invokeImpl(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+        // test one code here, skip others
+        RemotingCommand response = 
mockPullMessageResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+        ResponseFuture responseFuture = new ResponseFuture(channel, 0, null, 
1000,
+                resp -> { }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+        responseFuture.setResponseCommand(response);
+        promise.trySuccess(null);
+        future.complete(responseFuture);
+
+        Triple<PullResult, String, Boolean> rst = 
api.pullMessageFromSpecificBrokerAsync("", "", "", "", 1, 1, 1, 3000L).join();
+        Assert.assertNull(rst.getLeft());
+        
Assert.assertTrue(rst.getMiddle().contains(ResponseCode.SUBSCRIPTION_NOT_EXIST 
+ ""));
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
+    private RemotingCommand mockPullMessageResponse(int responseCode) throws 
Exception {
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        response.setCode(responseCode);
+        if (responseCode == ResponseCode.SUCCESS) {
+            MessageExt msg = new MessageExt();
+            msg.setBody("HW".getBytes());
+            msg.setTopic("topic");
+            msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
+            msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
+            byte[] encode = MessageDecoder.encode(msg, false);
+            response.setBody(encode);
+        }
+        PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) 
response.readCustomHeader();
+        responseHeader.setNextBeginOffset(0L);
+        responseHeader.setMaxOffset(0L);
+        responseHeader.setMinOffset(0L);
+        responseHeader.setOffsetDelta(0L);
+        responseHeader.setTopicSysFlag(0);
+        responseHeader.setGroupSysFlag(0);
+        responseHeader.setSuggestWhichBrokerId(0L);
+        responseHeader.setForbiddenType(0);
+        response.makeCustomHeaderToNet();
+        return response;
+    }
+
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
index d7bd753d77..7ea06665c3 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.rocketmq.broker.failover;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
@@ -28,7 +32,10 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.PutMessageResult;
@@ -38,6 +45,7 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,7 +57,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -73,6 +80,12 @@ public class EscapeBridgeTest {
     @Mock
     private DefaultMQProducer defaultMQProducer;
 
+    @Mock
+    private TopicRouteInfoManager topicRouteInfoManager;
+
+    @Mock
+    private BrokerOuterAPI brokerOuterAPI;
+
     private static final String BROKER_NAME = "broker_a";
 
     private static final String TEST_TOPIC = "TEST_TOPIC";
@@ -92,14 +105,10 @@ public class EscapeBridgeTest {
         
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(defaultMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), 
any())).thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
-        TopicRouteInfoManager topicRouteInfoManager = 
mock(TopicRouteInfoManager.class);
         
when(brokerController.getTopicRouteInfoManager()).thenReturn(topicRouteInfoManager);
         when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn("");
 
-        BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
         when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
-        when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), 
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), 
anyLong()))
-            .thenReturn(CompletableFuture.completedFuture(new 
PullResult(PullStatus.FOUND, -1, -1, -1, new ArrayList<>())));
 
         brokerConfig.setEnableSlaveActingMaster(true);
         brokerConfig.setEnableRemoteEscape(true);
@@ -179,6 +188,52 @@ public class EscapeBridgeTest {
         Assertions.assertThatCode(() -> 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false)).doesNotThrowAnyException();
     }
 
+    @Test
+    public void getMessageAsyncTest_localStore_getMessageAsync_null() {
+        
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
+        when(defaultMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("getMessageResult is null", rst.getMiddle());
+        Assert.assertFalse(rst.getRight()); // no retry
+    }
+
+    @Test
+    public void getMessageAsyncTest_localStore_decodeNothing() throws 
Exception {
+        
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
+        when(defaultMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), any()))
+                
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0, 
TEST_TOPIC, null)));
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("Can not get msg", rst.getMiddle());
+        Assert.assertFalse(rst.getRight()); // no retry
+    }
+
+    @Test
+    public void getMessageAsyncTest_localStore_message_found() throws 
Exception {
+        
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
+        when(defaultMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), any()))
+                
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(2, 
TEST_TOPIC, "HW".getBytes())));
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
+        Assert.assertNotNull(rst.getLeft());
+        Assert.assertEquals(0, rst.getLeft().getQueueOffset());
+        Assert.assertTrue(Arrays.equals("HW".getBytes(), 
rst.getLeft().getBody()));
+        Assert.assertFalse(rst.getRight());
+    }
+
+    @Test
+    public void getMessageAsyncTest_remoteStore_addressNotFound() throws 
Exception {
+        
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(null);
+
+        // just test address not found, since we have complete tests of 
getMessageFromRemoteAsync()
+        when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(null);
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("brokerAddress not found", rst.getMiddle());
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
     @Test
     public void getMessageFromRemoteTest() {
         Assertions.assertThatCode(() -> 
escapeBridge.getMessageFromRemote(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, 
BROKER_NAME)).doesNotThrowAnyException();
@@ -189,6 +244,54 @@ public class EscapeBridgeTest {
         Assertions.assertThatCode(() -> 
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, 
BROKER_NAME)).doesNotThrowAnyException();
     }
 
+    @Test
+    public void getMessageFromRemoteAsyncTest_exception_caught() throws 
Exception {
+        when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), 
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), 
anyLong()))
+                .thenThrow(new RemotingException("mock remoting exception"));
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, 
BROKER_NAME).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("Get message from remote failed", rst.getMiddle());
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
+    @Test
+    public void getMessageFromRemoteAsyncTest_brokerAddressNotFound() throws 
Exception {
+        when(topicRouteInfoManager.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(null);
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, 
BROKER_NAME).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("brokerAddress not found", rst.getMiddle());
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
+    @Test
+    public void getMessageFromRemoteAsyncTest_message_found() throws Exception 
{
+        PullResult pullResult = new PullResult(PullStatus.FOUND, 1, 1, 1, 
Arrays.asList(new MessageExt()));
+        when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), 
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), 
anyLong()))
+                
.thenReturn(CompletableFuture.completedFuture(Triple.of(pullResult, "", 
false))); // right value is ignored
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, 
BROKER_NAME).join();
+        Assert.assertNotNull(rst.getLeft());
+        Assert.assertTrue(StringUtils.isEmpty(rst.getMiddle()));
+        Assert.assertFalse(rst.getRight()); // no retry
+    }
+
+    @Test
+    public void getMessageFromRemoteAsyncTest_message_notFound() throws 
Exception {
+        PullResult pullResult = new PullResult(PullStatus.NO_MATCHED_MSG, 1, 
1, 1, null);
+        when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), 
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), 
anyLong()))
+                
.thenReturn(CompletableFuture.completedFuture(Triple.of(pullResult, "no msg", 
false)));
+        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, DEFAULT_QUEUE_ID, 
BROKER_NAME).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("no msg", rst.getMiddle());
+        Assert.assertFalse(rst.getRight()); // no retry
+
+        when(brokerOuterAPI.pullMessageFromSpecificBrokerAsync(anyString(), 
anyString(), anyString(), anyString(), anyInt(), anyLong(), anyInt(), 
anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(null, 
"other resp code", true)));
+        rst = escapeBridge.getMessageFromRemoteAsync(TEST_TOPIC, 1, 
DEFAULT_QUEUE_ID, BROKER_NAME).join();
+        Assert.assertNull(rst.getLeft());
+        Assert.assertEquals("other resp code", rst.getMiddle());
+        Assert.assertTrue(rst.getRight()); // need retry
+    }
+
     @Test
     public void decodeMsgListTest() {
         ByteBuffer byteBuffer = ByteBuffer.allocate(10);
@@ -199,4 +302,39 @@ public class EscapeBridgeTest {
         Assertions.assertThatCode(() -> 
escapeBridge.decodeMsgList(getMessageResult, false)).doesNotThrowAnyException();
     }
 
+    @Test
+    public void decodeMsgListTest_messageNotNull() throws Exception {
+        MessageExt msg = new MessageExt();
+        msg.setBody("HW".getBytes());
+        msg.setTopic("topic");
+        msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
+        msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
+        ByteBuffer byteBuffer = ByteBuffer.wrap(MessageDecoder.encode(msg, 
false));
+        SelectMappedBufferResult result = new SelectMappedBufferResult(0, 
byteBuffer, 10, new DefaultMappedFile());
+
+
+        getMessageResult.addMessage(result);
+        getMessageResult.getMessageQueueOffset().add(0L);
+        List<MessageExt> list = escapeBridge.decodeMsgList(getMessageResult, 
false); // skip deCompressBody test
+        Assert.assertEquals(1, list.size());
+        Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
+    }
+
+    private GetMessageResult mockGetMessageResult(int count, String topic, 
byte[] body) throws Exception {
+        GetMessageResult result = new GetMessageResult();
+        for (int i = 0; i < count; i++) {
+            MessageExt msg = new MessageExt();
+            msg.setBody(body);
+            msg.setTopic(topic);
+            msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
+            msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
+            ByteBuffer byteBuffer = ByteBuffer.wrap(MessageDecoder.encode(msg, 
false));
+            SelectMappedBufferResult bufferResult = new 
SelectMappedBufferResult(0, byteBuffer, body.length, new DefaultMappedFile());
+
+            result.addMessage(bufferResult);
+            result.getMessageQueueOffset().add(i + 0L);
+        }
+        return result;
+    }
+
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 78b76264fe..d7ea97c550 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -20,12 +20,17 @@ import com.alibaba.fastjson.JSON;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -36,9 +41,14 @@ import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,19 +60,25 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.times;
 
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class PopReviveServiceTest {
 
-    private static final String REVIVE_TOPIC = PopAckConstants.REVIVE_TOPIC + 
"test";
+    private static final String CLUSTER_NAME = "test";
+    private static final String REVIVE_TOPIC = 
PopAckConstants.buildClusterReviveTopic(CLUSTER_NAME);
     private static final int REVIVE_QUEUE_ID = 0;
     private static final String GROUP = "group";
     private static final String TOPIC = "topic";
     private static final SocketAddress STORE_HOST = 
NetworkUtil.string2SocketAddress("127.0.0.1:8080");
+    private static final Long INVISIBLE_TIME = 1000L;
 
     @Mock
     private MessageStore messageStore;
@@ -76,6 +92,9 @@ public class PopReviveServiceTest {
     private SubscriptionGroupManager subscriptionGroupManager;
     @Mock
     private BrokerController brokerController;
+    @Mock
+    private EscapeBridge escapeBridge;
+    private PopMessageProcessor popMessageProcessor;
 
     private BrokerConfig brokerConfig;
     private PopReviveService popReviveService;
@@ -83,12 +102,14 @@ public class PopReviveServiceTest {
     @Before
     public void before() {
         brokerConfig = new BrokerConfig();
-
+        brokerConfig.setBrokerClusterName(CLUSTER_NAME);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
         when(brokerController.getMessageStore()).thenReturn(messageStore);
         
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+        when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
         
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
         when(timerMessageStore.getDequeueBehind()).thenReturn(0L);
         when(timerMessageStore.getEnqueueBehind()).thenReturn(0L);
@@ -96,6 +117,9 @@ public class PopReviveServiceTest {
         when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new 
TopicConfig());
         
when(subscriptionGroupManager.findSubscriptionGroupConfig(anyString())).thenReturn(new
 SubscriptionGroupConfig());
 
+        popMessageProcessor = new PopMessageProcessor(brokerController); // a 
real one, not mock
+        
when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);
+
         popReviveService = spy(new PopReviveService(brokerController, 
REVIVE_TOPIC, REVIVE_QUEUE_ID));
         popReviveService.setShouldRunPopRevive(true);
     }
@@ -204,6 +228,141 @@ public class PopReviveServiceTest {
         assertEquals(maxReviveOffset, 
commitOffsetCaptor.getValue().longValue());
     }
 
+    @Test
+    public void testReviveMsgFromCk_messageFound_writeRetryOK() throws 
Throwable {
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+        StringBuilder actualRetryTopic = new StringBuilder();
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(new 
MessageExt(), "", false)));
+        
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
 -> {
+            MessageExtBrokerInner msg = invocation.getArgument(0);
+            actualRetryTopic.append(msg.getTopic());
+            return new PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK));
+        });
+
+        popReviveService.mergeAndRevive(reviveObj);
+        Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, 
false), actualRetryTopic.toString());
+        verify(escapeBridge, 
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
+    @Test
+    public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() 
throws Throwable {
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+        StringBuilder actualRetryTopic = new StringBuilder();
+        StringBuilder actualReviveTopic = new StringBuilder();
+        AtomicLong actualInvisibleTime = new AtomicLong(0L);
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(new 
MessageExt(), "", false)));
+        
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
 -> {
+            MessageExtBrokerInner msg = invocation.getArgument(0);
+            actualRetryTopic.append(msg.getTopic());
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
+        });
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenAnswer(invocation
 -> {
+            MessageExtBrokerInner msg = invocation.getArgument(0);
+            actualReviveTopic.append(msg.getTopic());
+            PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(), 
PopCheckPoint.class);
+            actualInvisibleTime.set(rewriteCK.getReviveTime());
+            return new PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK));
+        });
+
+        popReviveService.mergeAndRevive(reviveObj);
+        Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, 
false), actualRetryTopic.toString());
+        Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString());
+        Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L, 
actualInvisibleTime.get()); // first interval is 10s
+        verify(escapeBridge, 
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
+    @Test
+    public void 
testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws 
Throwable {
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        ck.setRePutTimes("17");
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+        StringBuilder actualRetryTopic = new StringBuilder();
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(new 
MessageExt(), "", false)));
+        
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
 -> {
+            MessageExtBrokerInner msg = invocation.getArgument(0);
+            actualRetryTopic.append(msg.getTopic());
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
+        });
+
+        popReviveService.mergeAndRevive(reviveObj);
+        Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, 
false), actualRetryTopic.toString());
+        verify(escapeBridge, 
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
+    @Test
+    public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable 
{
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(null, 
"", false)));
+
+        popReviveService.mergeAndRevive(reviveObj);
+        verify(escapeBridge, 
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
+    @Test
+    public void testReviveMsgFromCk_messageNotFound_needRetry() throws 
Throwable {
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+        StringBuilder actualReviveTopic = new StringBuilder();
+        AtomicLong actualInvisibleTime = new AtomicLong(0L);
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(null, 
"", true)));
+        
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenAnswer(invocation
 -> {
+            MessageExtBrokerInner msg = invocation.getArgument(0);
+            actualReviveTopic.append(msg.getTopic());
+            PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(), 
PopCheckPoint.class);
+            actualInvisibleTime.set(rewriteCK.getReviveTime());
+            return new PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK));
+        });
+
+        popReviveService.mergeAndRevive(reviveObj);
+        Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString());
+        Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L, 
actualInvisibleTime.get()); // first interval is 10s
+        verify(escapeBridge, 
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
+    @Test
+    public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws 
Throwable {
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        ck.setRePutTimes("17");
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(null, 
"", true)));
+
+        popReviveService.mergeAndRevive(reviveObj);
+        verify(escapeBridge, 
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
     public static PopCheckPoint buildPopCheckPoint(long startOffset, long 
popTime, long reviveOffset) {
         PopCheckPoint ck = new PopCheckPoint();
         ck.setStartOffset(startOffset);
@@ -214,7 +373,8 @@ public class PopReviveServiceTest {
         ck.setNum((byte) 1);
         ck.setBitMap(0);
         ck.setReviveOffset(reviveOffset);
-        ck.setInvisibleTime(1000);
+        ck.setInvisibleTime(INVISIBLE_TIME);
+        ck.setBrokerName("broker-a");
         return ck;
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java 
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index e041b66d9c..38e0a20752 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -43,6 +43,8 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
     private List<Integer> queueOffsetDiff;
     @JSONField(name = "bn")
     String brokerName;
+    @JSONField(name = "rp")
+    String rePutTimes; // ck rePut times
 
     public long getReviveOffset() {
         return reviveOffset;
@@ -136,6 +138,14 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
         this.brokerName = brokerName;
     }
 
+    public String getRePutTimes() {
+        return rePutTimes;
+    }
+
+    public void setRePutTimes(String rePutTimes) {
+        this.rePutTimes = rePutTimes;
+    }
+
     public void addDiff(int diff) {
         if (this.queueOffsetDiff == null) {
             this.queueOffsetDiff = new ArrayList<>(8);
@@ -171,10 +181,21 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
         return startOffset + queueOffsetDiff.get(index);
     }
 
+    public int parseRePutTimes() {
+        if (null == rePutTimes) {
+            return 0;
+        }
+        try {
+            return Integer.parseInt(rePutTimes);
+        } catch (Exception e) {
+        }
+        return Byte.MAX_VALUE;
+    }
+
     @Override
     public String toString() {
         return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" 
+ queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + 
num + ", reviveTime=" + getReviveTime()
-            + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + 
", brokerName=" + brokerName + "]";
+            + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + 
", brokerName=" + brokerName + ", rePutTimes=" + rePutTimes + "]";
     }
 
     @Override

Reply via email to