This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 84156084a4 [ISSUE #7321] Refector NettyRemotingAbstract with unify 
future implementation (#7322)
84156084a4 is described below

commit 84156084a4c5228e1d2fe21e068fff330bbc40d1
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Sun Oct 8 11:13:25 2023 +0800

    [ISSUE #7321] Refector NettyRemotingAbstract with unify future 
implementation (#7322)
    
    * Refector NettyRemotingAbstract
    
    * Add invoke with future method
    
    * Deprecate InvokeCallback#operationComplete
    
    * Add operationSuccess and operationException for InvokeCallback
    
    * fix unit test
    
    * fix unit test
    
    * Keep InvokeCallback#operationComplete
    
    * Optimize invokeAsyncImpl operationComplete
    
    * Add unit test for NettyRemotingClient
    
    * fix checkstyle
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 147 ++++++----
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  71 ++---
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 239 ++++++++--------
 .../client/impl/mqclient/MQClientAPIExt.java       | 309 +++++++++------------
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  12 +-
 .../proxy/remoting/RemotingProtocolServer.java     |  22 +-
 .../proxy/service/mqclient/MQClientAPIExtTest.java |  97 +++----
 .../apache/rocketmq/remoting/InvokeCallback.java   |  15 +
 .../apache/rocketmq/remoting/RemotingClient.java   |  27 +-
 .../remoting/netty/NettyRemotingAbstract.java      | 123 +++++---
 .../remoting/netty/NettyRemotingClient.java        |  33 ++-
 .../rocketmq/remoting/netty/ResponseFuture.java    |  15 +
 .../rocketmq/remoting/rpc/RpcClientImpl.java       |  29 +-
 .../rocketmq/remoting/RemotingServerTest.java      |  22 +-
 .../rocketmq/remoting/netty/MockChannel.java       |  21 +-
 .../remoting/netty/MockChannelPromise.java         | 191 +++++++++++++
 .../remoting/netty/NettyRemotingAbstractTest.java  |  54 +++-
 .../remoting/netty/NettyRemotingClientTest.java    | 185 +++++++++++-
 18 files changed, 1029 insertions(+), 583 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 9dfb8127d6..6fde48dd99 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -73,6 +73,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -107,6 +108,8 @@ import 
org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
@@ -124,8 +127,6 @@ import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerReques
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -151,7 +152,6 @@ public class BrokerOuterAPI {
     private final RpcClient rpcClient;
     private String nameSrvAddr = null;
 
-
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new 
ClientMetadata());
     }
@@ -459,7 +459,7 @@ public class BrokerOuterAPI {
      * @param filterServerList
      * @param oneway
      * @param timeoutMills
-     * @param compressed default false
+     * @param compressed         default false
      * @return
      */
     public List<RegisterBrokerResult> registerBrokerAll(
@@ -643,7 +643,6 @@ public class BrokerOuterAPI {
         queueDatas.add(queueData);
         final byte[] topicRouteBody = topicRouteData.encode();
 
-
         List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
         final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
         for (final String namesrvAddr : nameServerAddressList) {
@@ -910,25 +909,33 @@ public class BrokerOuterAPI {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
-        this.remotingClient.invokeAsync(addr, request, timeoutMillis, 
responseFuture -> {
-            if (callback == null) {
-                return;
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
             }
 
-            try {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    if (response.getCode() == ResponseCode.SUCCESS) {
-                        LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(),
-                            LockBatchResponseBody.class);
-                        Set<MessageQueue> messageQueues = 
responseBody.getLockOKMQSet();
-                        callback.onSuccess(messageQueues);
-                    } else {
-                        callback.onException(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                    }
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+                if (callback == null) {
+                    return;
                 }
-            } catch (Throwable ignored) {
+                if (response.getCode() == ResponseCode.SUCCESS) {
+                    LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(),
+                        LockBatchResponseBody.class);
+                    Set<MessageQueue> messageQueues = 
responseBody.getLockOKMQSet();
+                    callback.onSuccess(messageQueues);
+                } else {
+                    callback.onException(new 
MQBrokerException(response.getCode(), response.getRemark()));
+                }
+            }
 
+            @Override
+            public void operationFail(Throwable throwable) {
+                if (callback == null) {
+                    return;
+                }
+                callback.onException(throwable);
             }
         });
     }
@@ -942,22 +949,30 @@ public class BrokerOuterAPI {
 
         request.setBody(requestBody.encode());
 
-        this.remotingClient.invokeAsync(addr, request, timeoutMillis, 
responseFuture -> {
-            if (callback == null) {
-                return;
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
             }
 
-            try {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    if (response.getCode() == ResponseCode.SUCCESS) {
-                        callback.onSuccess();
-                    } else {
-                        callback.onException(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                    }
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+                if (callback == null) {
+                    return;
                 }
-            } catch (Throwable ignored) {
+                if (response.getCode() == ResponseCode.SUCCESS) {
+                    callback.onSuccess();
+                } else {
+                    callback.onException(new 
MQBrokerException(response.getCode(), response.getRemark()));
+                }
+            }
 
+            @Override
+            public void operationFail(Throwable throwable) {
+                if (callback == null) {
+                    return;
+                }
+                callback.onException(throwable);
             }
         });
     }
@@ -983,21 +998,27 @@ public class BrokerOuterAPI {
         CompletableFuture<SendResult> cf = new CompletableFuture<>();
         final String msgId = msg.getMsgId();
         try {
-            this.remotingClient.invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (null != response) {
-                    SendResult sendResult = null;
+            this.remotingClient.invokeAsync(brokerAddr, request, 
timeoutMillis, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
                     try {
-                        sendResult = this.processSendResponse(brokerName, msg, 
response);
+                        SendResult sendResult = 
processSendResponse(brokerName, msg, response);
                         cf.complete(sendResult);
                     } catch (MQBrokerException | RemotingCommandException e) {
                         LOGGER.error("processSendResponse in 
sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e);
                         cf.completeExceptionally(e);
                     }
-                } else {
-                    cf.complete(null);
                 }
 
+                @Override
+                public void operationFail(Throwable throwable) {
+                    cf.completeExceptionally(throwable);
+                }
             });
         } catch (Throwable t) {
             LOGGER.error("invokeAsync failed in 
sendMessageToSpecificBrokerAsync, msgId=" + msgId, t);
@@ -1057,7 +1078,7 @@ public class BrokerOuterAPI {
         }
         if (sendStatus != null) {
             SendMessageResponseHeader responseHeader =
-                    (SendMessageResponseHeader) 
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                (SendMessageResponseHeader) 
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
 
             //If namespace not null , reset Topic without namespace.
             String topic = msg.getTopic();
@@ -1073,8 +1094,8 @@ public class BrokerOuterAPI {
                 uniqMsgId = sb.toString();
             }
             SendResult sendResult = new SendResult(sendStatus,
-                    uniqMsgId,
-                    responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
+                uniqMsgId,
+                responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
             sendResult.setTransactionId(responseHeader.getTransactionId());
             String regionId = 
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
             String traceOn = 
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -1218,8 +1239,9 @@ public class BrokerOuterAPI {
     /**
      * Broker try to elect itself as a master in broker set
      */
-    public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String 
controllerAddress, String clusterName, String brokerName,
-                                                 Long brokerId) throws 
Exception {
+    public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String 
controllerAddress, String clusterName,
+        String brokerName,
+        Long brokerId) throws Exception {
 
         final ElectMasterRequestHeader requestHeader = 
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, 
requestHeader);
@@ -1237,7 +1259,8 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public GetNextBrokerIdResponseHeader getNextBrokerId(final String 
clusterName, final String brokerName, final String controllerAddress) throws 
Exception {
+    public GetNextBrokerIdResponseHeader getNextBrokerId(final String 
clusterName, final String brokerName,
+        final String controllerAddress) throws Exception {
         final GetNextBrokerIdRequestHeader requestHeader = new 
GetNextBrokerIdRequestHeader(clusterName, brokerName);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1248,7 +1271,8 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, 
final String brokerName, final Long brokerId, final String registerCheckCode, 
final String controllerAddress) throws Exception {
+    public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, 
final String brokerName,
+        final Long brokerId, final String registerCheckCode, final String 
controllerAddress) throws Exception {
         final ApplyBrokerIdRequestHeader requestHeader = new 
ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, 
registerCheckCode);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_APPLY_BROKER_ID, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1259,7 +1283,9 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> 
registerBrokerToController(final String clusterName, final String brokerName, 
final Long brokerId, final String brokerAddress, final String 
controllerAddress) throws Exception {
+    public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> 
registerBrokerToController(
+        final String clusterName, final String brokerName, final Long 
brokerId, final String brokerAddress,
+        final String controllerAddress) throws Exception {
         final RegisterBrokerToControllerRequestHeader requestHeader = new 
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, 
brokerAddress);
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, 
requestHeader);
         final RemotingCommand response = 
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1355,16 +1381,25 @@ public class BrokerOuterAPI {
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
         CompletableFuture<PullResult> pullResultFuture = new 
CompletableFuture<>();
-        this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, 
responseFuture -> {
-            if (responseFuture.getCause() != null) {
-                pullResultFuture.complete(new 
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
-                return;
+        this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, 
new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
             }
-            try {
-                PullResultExt pullResultExt = 
this.processPullResponse(responseFuture.getResponseCommand(), brokerAddr);
-                this.processPullResult(pullResultExt, brokerName, queueId);
-                pullResultFuture.complete(pullResultExt);
-            } catch (Exception e) {
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+                try {
+                    PullResultExt pullResultExt = 
processPullResponse(response, brokerAddr);
+                    processPullResult(pullResultExt, brokerName, queueId);
+                    pullResultFuture.complete(pullResultExt);
+                } catch (Exception e) {
+                    pullResultFuture.complete(new 
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
+                }
+            }
+
+            @Override
+            public void operationFail(Throwable throwable) {
                 pullResultFuture.complete(new 
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
             }
         });
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 1ef3a94835..83835bd3d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -44,6 +44,8 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -55,8 +57,6 @@ import 
org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.QueryMessageResponseHeader;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 public class MQAdminImpl {
 
@@ -357,44 +357,51 @@ public class MQAdminImpl {
                             new InvokeCallback() {
                                 @Override
                                 public void operationComplete(ResponseFuture 
responseFuture) {
+
+                                }
+
+                                @Override
+                                public void operationSucceed(RemotingCommand 
response) {
                                     try {
-                                        RemotingCommand response = 
responseFuture.getResponseCommand();
-                                        if (response != null) {
-                                            switch (response.getCode()) {
-                                                case ResponseCode.SUCCESS: {
-                                                    QueryMessageResponseHeader 
responseHeader = null;
-                                                    try {
-                                                        responseHeader =
-                                                            
(QueryMessageResponseHeader) response
-                                                                
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
-                                                    } catch 
(RemotingCommandException e) {
-                                                        
log.error("decodeCommandCustomHeader exception", e);
-                                                        return;
-                                                    }
-
-                                                    List<MessageExt> wrappers =
-                                                        
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
-                                                    QueryResult qr = new 
QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
-                                                    try {
-                                                        
lock.writeLock().lock();
-                                                        
queryResultList.add(qr);
-                                                    } finally {
-                                                        
lock.writeLock().unlock();
-                                                    }
-                                                    break;
+                                        switch (response.getCode()) {
+                                            case ResponseCode.SUCCESS: {
+                                                QueryMessageResponseHeader 
responseHeader = null;
+                                                try {
+                                                    responseHeader =
+                                                        
(QueryMessageResponseHeader) response
+                                                            
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
+                                                } catch 
(RemotingCommandException e) {
+                                                    
log.error("decodeCommandCustomHeader exception", e);
+                                                    return;
                                                 }
-                                                default:
-                                                    
log.warn("getResponseCommand failed, {} {}", response.getCode(), 
response.getRemark());
-                                                    break;
+
+                                                List<MessageExt> wrappers =
+                                                    
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
+
+                                                QueryResult qr = new 
QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
+                                                try {
+                                                    lock.writeLock().lock();
+                                                    queryResultList.add(qr);
+                                                } finally {
+                                                    lock.writeLock().unlock();
+                                                }
+                                                break;
                                             }
-                                        } else {
-                                            log.warn("getResponseCommand 
return null");
+                                            default:
+                                                log.warn("getResponseCommand 
failed, {} {}", response.getCode(), response.getRemark());
+                                                break;
                                         }
+
                                     } finally {
                                         countDownLatch.countDown();
                                     }
                                 }
+
+                                @Override
+                                public void operationFail(Throwable throwable) 
{
+                                    log.error("queryMessage error, 
requestHeader={}", requestHeader);
+                                    countDownLatch.countDown();
+                                }
                             }, isUniqKey);
                     } catch (Exception e) {
                         log.warn("queryMessage exception", e);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 3201a493f7..2407e57373 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.Validators;
-import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.consumer.AckCallback;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
@@ -653,10 +652,13 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
             this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
                 @Override
                 public void operationComplete(ResponseFuture responseFuture) {
-                    long cost = System.currentTimeMillis() - beginStartTime;
-                    RemotingCommand response = 
responseFuture.getResponseCommand();
-                    if (null == sendCallback && response != null) {
 
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
+                    long cost = System.currentTimeMillis() - beginStartTime;
+                    if (null == sendCallback) {
                         try {
                             SendResult sendResult = 
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
                             if (context != null && sendResult != null) {
@@ -666,46 +668,47 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
                         } catch (Throwable e) {
                         }
 
-                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
+                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - beginStartTime, false, true);
                         return;
                     }
 
-                    if (response != null) {
+                    try {
+                        SendResult sendResult = 
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+                        assert sendResult != null;
+                        if (context != null) {
+                            context.setSendResult(sendResult);
+                            
context.getProducer().executeSendMessageHookAfter(context);
+                        }
+
                         try {
-                            SendResult sendResult = 
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
-                            assert sendResult != null;
-                            if (context != null) {
-                                context.setSendResult(sendResult);
-                                
context.getProducer().executeSendMessageHookAfter(context);
-                            }
+                            sendCallback.onSuccess(sendResult);
+                        } catch (Throwable e) {
+                        }
 
-                            try {
-                                sendCallback.onSuccess(sendResult);
-                            } catch (Throwable e) {
-                            }
+                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - beginStartTime, false, true);
+                    } catch (Exception e) {
+                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - beginStartTime, true, true);
+                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, 
request, sendCallback, topicPublishInfo, instance,
+                            retryTimesWhenSendFailed, times, e, context, 
false, producer);
+                    }
+                }
 
-                            producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
-                        } catch (Exception e) {
-                            producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
-                            onExceptionImpl(brokerName, msg, timeoutMillis - 
cost, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, e, context, 
false, producer);
-                        }
+                @Override
+                public void operationFail(Throwable throwable) {
+                    producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - beginStartTime, true, true);
+                    long cost = System.currentTimeMillis() - beginStartTime;
+                    if (throwable instanceof RemotingSendRequestException) {
+                        MQClientException ex = new MQClientException("send 
request failed", throwable);
+                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, 
request, sendCallback, topicPublishInfo, instance,
+                            retryTimesWhenSendFailed, times, ex, context, 
true, producer);
+                    } else if (throwable instanceof RemotingTimeoutException) {
+                        MQClientException ex = new MQClientException("wait 
response timeout, cost=" + cost, throwable);
+                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, 
request, sendCallback, topicPublishInfo, instance,
+                            retryTimesWhenSendFailed, times, ex, context, 
true, producer);
                     } else {
-                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
-                        if (!responseFuture.isSendRequestOK()) {
-                            MQClientException ex = new MQClientException("send 
request failed", responseFuture.getCause());
-                            onExceptionImpl(brokerName, msg, timeoutMillis - 
cost, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
-                        } else if (responseFuture.isTimeout()) {
-                            MQClientException ex = new MQClientException("wait 
response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause());
-                            onExceptionImpl(brokerName, msg, timeoutMillis - 
cost, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
-                        } else {
-                            MQClientException ex = new 
MQClientException("unknow reseaon", responseFuture.getCause());
-                            onExceptionImpl(brokerName, msg, timeoutMillis - 
cost, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
-                        }
+                        MQClientException ex = new MQClientException("unknow 
reseaon", throwable);
+                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, 
request, sendCallback, topicPublishInfo, instance,
+                            retryTimesWhenSendFailed, times, ex, context, 
true, producer);
                     }
                 }
             });
@@ -857,30 +860,25 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         final long timeoutMillis, final PopCallback popCallback
     ) throws RemotingException, InterruptedException {
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
-        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
BaseInvokeCallback(MQClientAPIImpl.this) {
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
             @Override
-            public void onComplete(ResponseFuture responseFuture) {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        PopResult
-                            popResult = 
MQClientAPIImpl.this.processPopResponse(brokerName, response, 
requestHeader.getTopic(), requestHeader);
-                        assert popResult != null;
-                        popCallback.onSuccess(popResult);
-                    } catch (Exception e) {
-                        popCallback.onException(e);
-                    }
-                } else {
-                    if (!responseFuture.isSendRequestOK()) {
-                        popCallback.onException(new 
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request 
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
-                    } else if (responseFuture.isTimeout()) {
-                        popCallback.onException(new 
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " 
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: 
" + request,
-                            responseFuture.getCause()));
-                    } else {
-                        popCallback.onException(new MQClientException("unknown 
reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + 
request, responseFuture.getCause()));
-                    }
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+                try {
+                    PopResult popResult = 
MQClientAPIImpl.this.processPopResponse(brokerName, response, 
requestHeader.getTopic(), requestHeader);
+                    popCallback.onSuccess(popResult);
+                } catch (Exception e) {
+                    popCallback.onException(e);
                 }
             }
+            @Override
+            public void operationFail(Throwable throwable) {
+                popCallback.onException(throwable);
+            }
         });
     }
 
@@ -959,34 +957,26 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
                 request.setBody(requestBody.encode());
             }
         }
-        this.remotingClient.invokeAsync(addr, request, timeOut, new 
BaseInvokeCallback(MQClientAPIImpl.this) {
+        this.remotingClient.invokeAsync(addr, request, timeOut, new 
InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
 
             @Override
-            public void onComplete(ResponseFuture responseFuture) {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        AckResult ackResult = new AckResult();
-                        if (ResponseCode.SUCCESS == response.getCode()) {
-                            ackResult.setStatus(AckStatus.OK);
-                        } else {
-                            ackResult.setStatus(AckStatus.NO_EXIST);
-                        }
-                        ackCallback.onSuccess(ackResult);
-                    } catch (Exception e) {
-                        ackCallback.onException(e);
-                    }
+            public void operationSucceed(RemotingCommand response) {
+                AckResult ackResult = new AckResult();
+                if (ResponseCode.SUCCESS == response.getCode()) {
+                    ackResult.setStatus(AckStatus.OK);
                 } else {
-                    if (!responseFuture.isSendRequestOK()) {
-                        ackCallback.onException(new 
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request 
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
-                    } else if (responseFuture.isTimeout()) {
-                        ackCallback.onException(new 
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " 
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: 
" + request,
-                            responseFuture.getCause()));
-                    } else {
-                        ackCallback.onException(new MQClientException("unknown 
reason. addr: " + addr + ", timeoutMillis: " + timeOut + ". Request: " + 
request, responseFuture.getCause()));
-                    }
+                    ackResult.setStatus(AckStatus.NO_EXIST);
                 }
+                ackCallback.onSuccess(ackResult);
+            }
 
+            @Override
+            public void operationFail(Throwable throwable) {
+                ackCallback.onException(throwable);
             }
         });
     }
@@ -999,39 +989,37 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         final AckCallback ackCallback
     ) throws RemotingException, MQBrokerException, InterruptedException {
         final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
requestHeader);
-        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
BaseInvokeCallback(MQClientAPIImpl.this) {
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
             @Override
-            public void onComplete(ResponseFuture responseFuture) {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        ChangeInvisibleTimeResponseHeader responseHeader = 
(ChangeInvisibleTimeResponseHeader) 
response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
-                        AckResult ackResult = new AckResult();
-                        if (ResponseCode.SUCCESS == response.getCode()) {
-                            ackResult.setStatus(AckStatus.OK);
-                            ackResult.setPopTime(responseHeader.getPopTime());
-                            ackResult.setExtraInfo(ExtraInfoUtil
-                                .buildExtraInfo(requestHeader.getOffset(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
-                                    responseHeader.getReviveQid(), 
requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + 
MessageConst.KEY_SEPARATOR
-                                + requestHeader.getOffset());
-                        } else {
-                            ackResult.setStatus(AckStatus.NO_EXIST);
-                        }
-                        ackCallback.onSuccess(ackResult);
-                    } catch (Exception e) {
-                        ackCallback.onException(e);
-                    }
-                } else {
-                    if (!responseFuture.isSendRequestOK()) {
-                        ackCallback.onException(new 
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request 
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
-                    } else if (responseFuture.isTimeout()) {
-                        ackCallback.onException(new 
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " 
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: 
" + request,
-                            responseFuture.getCause()));
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+                try {
+                    ChangeInvisibleTimeResponseHeader responseHeader = 
(ChangeInvisibleTimeResponseHeader) 
response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
+                    AckResult ackResult = new AckResult();
+                    if (ResponseCode.SUCCESS == response.getCode()) {
+                        ackResult.setStatus(AckStatus.OK);
+                        ackResult.setPopTime(responseHeader.getPopTime());
+                        ackResult.setExtraInfo(ExtraInfoUtil
+                            .buildExtraInfo(requestHeader.getOffset(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+                                responseHeader.getReviveQid(), 
requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + 
MessageConst.KEY_SEPARATOR
+                            + requestHeader.getOffset());
                     } else {
-                        ackCallback.onException(new MQClientException("unknown 
reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + 
request, responseFuture.getCause()));
+                        ackResult.setStatus(AckStatus.NO_EXIST);
                     }
+                    ackCallback.onSuccess(ackResult);
+                } catch (Exception e) {
+                    ackCallback.onException(e);
                 }
             }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+                ackCallback.onException(throwable);
+            }
         });
     }
 
@@ -1044,26 +1032,23 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
             @Override
             public void operationComplete(ResponseFuture responseFuture) {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        PullResult pullResult = 
MQClientAPIImpl.this.processPullResponse(response, addr);
-                        assert pullResult != null;
-                        pullCallback.onSuccess(pullResult);
-                    } catch (Exception e) {
-                        pullCallback.onException(e);
-                    }
-                } else {
-                    if (!responseFuture.isSendRequestOK()) {
-                        pullCallback.onException(new 
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request 
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
-                    } else if (responseFuture.isTimeout()) {
-                        pullCallback.onException(new 
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " 
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: 
" + request,
-                            responseFuture.getCause()));
-                    } else {
-                        pullCallback.onException(new 
MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + 
timeoutMillis + ". Request: " + request, responseFuture.getCause()));
-                    }
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+                try {
+                    PullResult pullResult = 
MQClientAPIImpl.this.processPullResponse(response, addr);
+                    pullCallback.onSuccess(pullResult);
+                } catch (Exception e) {
+                    pullCallback.onException(e);
                 }
             }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+                pullCallback.onException(throwable);
+            }
         });
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index d7c8ef8d92..f3102e1759 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -30,7 +30,6 @@ import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.OffsetNotFoundException;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
 import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -47,6 +46,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -106,19 +106,6 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         return false;
     }
 
-    protected static MQClientException processNullResponseErr(ResponseFuture 
responseFuture) {
-        MQClientException ex;
-        if (!responseFuture.isSendRequestOK()) {
-            ex = new MQClientException("send request failed", 
responseFuture.getCause());
-        } else if (responseFuture.isTimeout()) {
-            ex = new MQClientException("wait response timeout " + 
responseFuture.getTimeoutMillis() + "ms",
-                responseFuture.getCause());
-        } else {
-            ex = new MQClientException("unknown reason", 
responseFuture.getCause());
-        }
-        return ex;
-    }
-
     public CompletableFuture<Void> sendHeartbeatOneway(
         String brokerAddr,
         HeartbeatData heartbeatData,
@@ -146,24 +133,15 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         request.setLanguage(clientConfig.getLanguage());
         request.setBody(heartbeatData.encode());
 
-        CompletableFuture<Integer> future = new CompletableFuture<>();
-        try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    if (ResponseCode.SUCCESS == response.getCode()) {
-                        future.complete(response.getVersion());
-                    } else {
-                        future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark(), brokerAddr));
-                    }
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
-                }
-            });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
-        }
-        return future;
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+            CompletableFuture<Integer> future0 = new CompletableFuture<>();
+            if (ResponseCode.SUCCESS == response.getCode()) {
+                future0.complete(response.getVersion());
+            } else {
+                future0.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark(), brokerAddr));
+            }
+            return future0;
+        });
     }
 
     public CompletableFuture<SendResult> sendMessageAsync(
@@ -177,24 +155,15 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, 
requestHeaderV2);
         request.setBody(msg.getBody());
 
-        CompletableFuture<SendResult> future = new CompletableFuture<>();
-        try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        future.complete(this.processSendResponse(brokerName, 
msg, response, brokerAddr));
-                    } catch (Exception e) {
-                        future.completeExceptionally(e);
-                    }
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
-                }
-            });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
-        }
-        return future;
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+            CompletableFuture<SendResult> future0 = new CompletableFuture<>();
+            try {
+                future0.complete(this.processSendResponse(brokerName, msg, 
response, brokerAddr));
+            } catch (Exception e) {
+                future0.completeExceptionally(e);
+            }
+            return future0;
+        });
     }
 
     public CompletableFuture<SendResult> sendMessageAsync(
@@ -216,17 +185,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
             msgBatch.setBody(body);
 
             request.setBody(body);
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        future.complete(this.processSendResponse(brokerName, 
msgBatch, response, brokerAddr));
-                    } catch (Exception e) {
-                        future.completeExceptionally(e);
-                    }
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+            return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+                CompletableFuture<SendResult> future0 = new 
CompletableFuture<>();
+                try {
+                    future0.complete(processSendResponse(brokerName, msgBatch, 
response, brokerAddr));
+                } catch (Exception e) {
+                    future0.completeExceptionally(e);
                 }
+                return future0;
             });
         } catch (Throwable t) {
             future.completeExceptionally(t);
@@ -240,21 +206,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         long timeoutMillis
     ) {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, 
requestHeader);
-
-        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
-        try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    future.complete(response);
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
-                }
-            });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
-        }
-        return future;
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis);
     }
 
     public CompletableFuture<PopResult> popMessageAsync(
@@ -402,38 +354,31 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         QueryConsumerOffsetRequestHeader requestHeader,
         long timeoutMillis
     ) {
-        CompletableFuture<Long> future = new CompletableFuture<>();
-        try {
-            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, 
requestHeader);
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    switch (response.getCode()) {
-                        case ResponseCode.SUCCESS: {
-                            try {
-                                QueryConsumerOffsetResponseHeader 
responseHeader =
-                                    (QueryConsumerOffsetResponseHeader) 
response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
-                                future.complete(responseHeader.getOffset());
-                            } catch (RemotingCommandException e) {
-                                future.completeExceptionally(e);
-                            }
-                            break;
-                        }
-                        case ResponseCode.QUERY_NOT_FOUND: {
-                            future.completeExceptionally(new 
OffsetNotFoundException(response.getCode(), response.getRemark(), brokerAddr));
-                            break;
-                        }
-                        default:
-                            break;
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, 
requestHeader);
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+            CompletableFuture<Long> future0 = new CompletableFuture<>();
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    try {
+                        QueryConsumerOffsetResponseHeader responseHeader =
+                            (QueryConsumerOffsetResponseHeader) 
response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+                        future0.complete(responseHeader.getOffset());
+                    } catch (RemotingCommandException e) {
+                        future0.completeExceptionally(e);
                     }
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+                    break;
                 }
-            });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
-        }
-        return future;
+                case ResponseCode.QUERY_NOT_FOUND: {
+                    future0.completeExceptionally(new 
OffsetNotFoundException(response.getCode(), response.getRemark(), brokerAddr));
+                    break;
+                }
+                default: {
+                    future0.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
+                    break;
+                }
+            }
+            return future0;
+        });
     }
 
     public CompletableFuture<Void> updateConsumerOffsetOneWay(
@@ -461,9 +406,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
 
         CompletableFuture<List<String>> future = new CompletableFuture<>();
         try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
+            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
                     switch (response.getCode()) {
                         case ResponseCode.SUCCESS: {
                             if (response.getBody() != null) {
@@ -485,8 +435,11 @@ public class MQClientAPIExt extends MQClientAPIImpl {
                             break;
                     }
                     future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+                }
+
+                @Override
+                public void operationFail(Throwable throwable) {
+                    future.completeExceptionally(throwable);
                 }
             });
         } catch (Throwable t) {
@@ -501,9 +454,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
 
         CompletableFuture<Long> future = new CompletableFuture<>();
         try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
+            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
                     if (ResponseCode.SUCCESS == response.getCode()) {
                         try {
                             GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) 
response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
@@ -513,8 +471,11 @@ public class MQClientAPIExt extends MQClientAPIImpl {
                         }
                     }
                     future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+                }
+
+                @Override
+                public void operationFail(Throwable throwable) {
+                    future.completeExceptionally(throwable);
                 }
             });
         } catch (Throwable t) {
@@ -529,9 +490,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
 
         CompletableFuture<Long> future = new CompletableFuture<>();
         try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
+            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
                     if (ResponseCode.SUCCESS == response.getCode()) {
                         try {
                             GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) 
response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
@@ -541,8 +507,11 @@ public class MQClientAPIExt extends MQClientAPIImpl {
                         }
                     }
                     future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+                }
+
+                @Override
+                public void operationFail(Throwable throwable) {
+                    future.completeExceptionally(throwable);
                 }
             });
         } catch (Throwable t) {
@@ -555,57 +524,41 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         long timeoutMillis) {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, 
requestHeader);
 
-        CompletableFuture<Long> future = new CompletableFuture<>();
-        try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    if (response.getCode() == ResponseCode.SUCCESS) {
-                        try {
-                            SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) 
response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
-                            future.complete(responseHeader.getOffset());
-                        } catch (Throwable t) {
-                            future.completeExceptionally(t);
-                        }
-                    }
-                    future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+            CompletableFuture<Long> future0 = new CompletableFuture<>();
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                try {
+                    SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) 
response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+                    future0.complete(responseHeader.getOffset());
+                } catch (Throwable t) {
+                    future0.completeExceptionally(t);
                 }
-            });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
-        }
-        return future;
+            } else {
+                future0.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
+            }
+            return future0;
+        });
     }
 
     public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String 
brokerAddr,
         LockBatchRequestBody requestBody, long timeoutMillis) {
-        CompletableFuture<Set<MessageQueue>> future = new 
CompletableFuture<>();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
         request.setBody(requestBody.encode());
-        try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    if (response.getCode() == ResponseCode.SUCCESS) {
-                        try {
-                            LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
-                            Set<MessageQueue> messageQueues = 
responseBody.getLockOKMQSet();
-                            future.complete(messageQueues);
-                        } catch (Throwable t) {
-                            future.completeExceptionally(t);
-                        }
-                    }
-                    future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+            CompletableFuture<Set<MessageQueue>> future0 = new 
CompletableFuture<>();
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                try {
+                    LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
+                    Set<MessageQueue> messageQueues = 
responseBody.getLockOKMQSet();
+                    future0.complete(messageQueues);
+                } catch (Throwable t) {
+                    future0.completeExceptionally(t);
                 }
-            });
-        } catch (Exception e) {
-            future.completeExceptionally(e);
-        }
-        return future;
+            } else {
+                future0.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
+            }
+            return future0;
+        });
     }
 
     public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr,
@@ -624,25 +577,21 @@ public class MQClientAPIExt extends MQClientAPIImpl {
 
     public CompletableFuture<Boolean> notification(String brokerAddr, 
NotificationRequestHeader requestHeader,
         long timeoutMillis) {
-        CompletableFuture<Boolean> future = new CompletableFuture<>();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
-        try {
-            this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenAccept(response -> {
-                if (response.getCode() == ResponseCode.SUCCESS) {
-                    try {
-                        NotificationResponseHeader responseHeader = 
(NotificationResponseHeader) 
response.decodeCommandCustomHeader(NotificationResponseHeader.class);
-                        future.complete(responseHeader.isHasMsg());
-                    } catch (Throwable t) {
-                        future.completeExceptionally(t);
-                    }
-                } else {
-                    future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
+        return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
+            CompletableFuture<Boolean> future0 = new CompletableFuture<>();
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                try {
+                    NotificationResponseHeader responseHeader = 
(NotificationResponseHeader) 
response.decodeCommandCustomHeader(NotificationResponseHeader.class);
+                    future0.complete(responseHeader.isHasMsg());
+                } catch (Throwable t) {
+                    future0.completeExceptionally(t);
                 }
-            });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
-        }
-        return future;
+            } else {
+                future0.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
+            }
+            return future0;
+        });
     }
 
     public CompletableFuture<RemotingCommand> invoke(String brokerAddr, 
RemotingCommand request, long timeoutMillis) {
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index d13f2cfe43..c152d38ea5 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -212,7 +212,7 @@ public class MQClientAPIImplTest {
                 RemotingCommand request = mock.getArgument(1);
                 ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
                 
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
-                callback.operationComplete(responseFuture);
+                callback.operationSucceed(responseFuture.getResponseCommand());
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -386,7 +386,7 @@ public class MQClientAPIImplTest {
                 RemotingCommand request = mock.getArgument(1);
                 ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
                 
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
-                callback.operationComplete(responseFuture);
+                callback.operationSucceed(responseFuture.getResponseCommand());
                 return null;
             }
         }).when(remotingClient).invokeAsync(Matchers.anyString(), 
Matchers.any(RemotingCommand.class), Matchers.anyLong(), 
Matchers.any(InvokeCallback.class));
@@ -472,7 +472,7 @@ public class MQClientAPIImplTest {
                 message.putUserProperty("key", "value");
                 response.setBody(MessageDecoder.encode(message, false));
                 responseFuture.setResponseCommand(response);
-                callback.operationComplete(responseFuture);
+                callback.operationSucceed(responseFuture.getResponseCommand());
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -543,7 +543,7 @@ public class MQClientAPIImplTest {
                 
message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, 
String.valueOf(0));
                 response.setBody(MessageDecoder.encode(message, false));
                 responseFuture.setResponseCommand(response);
-                callback.operationComplete(responseFuture);
+                callback.operationSucceed(responseFuture.getResponseCommand());
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -585,7 +585,7 @@ public class MQClientAPIImplTest {
                 response.setOpaque(request.getOpaque());
                 response.setCode(ResponseCode.SUCCESS);
                 responseFuture.setResponseCommand(response);
-                callback.operationComplete(responseFuture);
+                callback.operationSucceed(responseFuture.getResponseCommand());
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -622,7 +622,7 @@ public class MQClientAPIImplTest {
                 responseHeader.setPopTime(System.currentTimeMillis());
                 responseHeader.setInvisibleTime(10 * 1000L);
                 responseFuture.setResponseCommand(response);
-                callback.operationComplete(responseFuture);
+                callback.operationSucceed(responseFuture.getResponseCommand());
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index fe07090d50..3227d1e1c6 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.acl.AccessValidator;
-import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
@@ -51,10 +50,12 @@ import 
org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
 import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
 import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -239,12 +240,21 @@ public class RemotingProtocolServer implements 
StartAndShutdown, RemotingProxyOu
         long timeoutMillis) {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
-            this.defaultRemotingServer.invokeAsync(channel, request, 
timeoutMillis, responseFuture -> {
-                if (responseFuture.getResponseCommand() == null) {
-                    future.completeExceptionally(new 
MQClientException("response is null after send request to client", 
responseFuture.getCause()));
-                    return;
+            this.defaultRemotingServer.invokeAsync(channel, request, 
timeoutMillis, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
+                    future.complete(response);
+                }
+
+                @Override
+                public void operationFail(Throwable throwable) {
+                    future.completeExceptionally(throwable);
                 }
-                future.complete(responseFuture.getResponseCommand());
             });
         } catch (Throwable t) {
             future.completeExceptionally(t);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
index 3f3a4ae40c..e2d05b0f5a 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -85,6 +86,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientAPIExtTest {
@@ -109,13 +111,9 @@ public class MQClientAPIExtTest {
 
     @Test
     public void testSendHeartbeatAsync() throws Exception {
-        doAnswer((Answer<Void>) mock -> {
-            InvokeCallback invokeCallback = mock.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, 
invokeCallback, null);
-            
responseFuture.putResponse(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
 ""));
-            invokeCallback.operationComplete(responseFuture);
-            return null;
-        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        
future.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, 
""));
+        doReturn(future).when(remotingClient).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
 
         assertNotNull(mqClientAPI.sendHeartbeatAsync(BROKER_ADDR, new 
HeartbeatData(), TIMEOUT).get());
     }
@@ -123,20 +121,16 @@ public class MQClientAPIExtTest {
     @Test
     public void testSendMessageAsync() throws Exception {
         AtomicReference<String> msgIdRef = new AtomicReference<>();
-        doAnswer((Answer<Void>) mock -> {
-            InvokeCallback invokeCallback = mock.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, 
invokeCallback, null);
-            RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-            SendMessageResponseHeader sendMessageResponseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
-            sendMessageResponseHeader.setMsgId(msgIdRef.get());
-            sendMessageResponseHeader.setQueueId(0);
-            sendMessageResponseHeader.setQueueOffset(1L);
-            response.setCode(ResponseCode.SUCCESS);
-            response.makeCustomHeaderToNet();
-            responseFuture.putResponse(response);
-            invokeCallback.operationComplete(responseFuture);
-            return null;
-        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        SendMessageResponseHeader sendMessageResponseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
+        sendMessageResponseHeader.setMsgId(msgIdRef.get());
+        sendMessageResponseHeader.setQueueId(0);
+        sendMessageResponseHeader.setQueueOffset(1L);
+        response.setCode(ResponseCode.SUCCESS);
+        response.makeCustomHeaderToNet();
+        future.complete(response);
+        doReturn(future).when(remotingClient).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
 
         MessageExt messageExt = createMessage();
         msgIdRef.set(MessageClientIDSetter.getUniqID(messageExt));
@@ -150,20 +144,16 @@ public class MQClientAPIExtTest {
 
     @Test
     public void testSendMessageListAsync() throws Exception {
-        doAnswer((Answer<Void>) mock -> {
-            InvokeCallback invokeCallback = mock.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, 
invokeCallback, null);
-            RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-            SendMessageResponseHeader sendMessageResponseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
-            sendMessageResponseHeader.setMsgId("");
-            sendMessageResponseHeader.setQueueId(0);
-            sendMessageResponseHeader.setQueueOffset(1L);
-            response.setCode(ResponseCode.SUCCESS);
-            response.makeCustomHeaderToNet();
-            responseFuture.putResponse(response);
-            invokeCallback.operationComplete(responseFuture);
-            return null;
-        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        SendMessageResponseHeader sendMessageResponseHeader = 
(SendMessageResponseHeader) response.readCustomHeader();
+        sendMessageResponseHeader.setMsgId("");
+        sendMessageResponseHeader.setQueueId(0);
+        sendMessageResponseHeader.setQueueOffset(1L);
+        response.setCode(ResponseCode.SUCCESS);
+        response.makeCustomHeaderToNet();
+        future.complete(response);
+        doReturn(future).when(remotingClient).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
 
         List<MessageExt> messageExtList = new ArrayList<>();
         StringBuilder sb = new StringBuilder();
@@ -182,13 +172,9 @@ public class MQClientAPIExtTest {
 
     @Test
     public void testSendMessageBackAsync() throws Exception {
-        doAnswer((Answer<Void>) mock -> {
-            InvokeCallback invokeCallback = mock.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, 
invokeCallback, null);
-            
responseFuture.putResponse(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
 ""));
-            invokeCallback.operationComplete(responseFuture);
-            return null;
-        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        
future.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, 
""));
+        doReturn(future).when(remotingClient).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
 
         RemotingCommand remotingCommand = 
mqClientAPI.sendMessageBackAsync(BROKER_ADDR, new 
ConsumerSendMsgBackRequestHeader(), TIMEOUT)
             .get();
@@ -285,7 +271,7 @@ public class MQClientAPIExtTest {
             body.setConsumerIdList(clientIds);
             response.setBody(body.encode());
             responseFuture.putResponse(response);
-            invokeCallback.operationComplete(responseFuture);
+            
invokeCallback.operationSucceed(responseFuture.getResponseCommand());
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
 
@@ -302,7 +288,7 @@ public class MQClientAPIExtTest {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.makeCustomHeaderToNet();
             responseFuture.putResponse(response);
-            invokeCallback.operationComplete(responseFuture);
+            
invokeCallback.operationSucceed(responseFuture.getResponseCommand());
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
 
@@ -322,7 +308,7 @@ public class MQClientAPIExtTest {
             response.setCode(ResponseCode.SUCCESS);
             response.makeCustomHeaderToNet();
             responseFuture.putResponse(response);
-            invokeCallback.operationComplete(responseFuture);
+            
invokeCallback.operationSucceed(responseFuture.getResponseCommand());
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
 
@@ -335,18 +321,15 @@ public class MQClientAPIExtTest {
     @Test
     public void testSearchOffsetAsync() throws Exception {
         long offset = ThreadLocalRandom.current().nextLong();
-        doAnswer((Answer<Void>) mock -> {
-            InvokeCallback invokeCallback = mock.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, 
invokeCallback, null);
-            RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
-            SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) response.readCustomHeader();
-            responseHeader.setOffset(offset);
-            response.setCode(ResponseCode.SUCCESS);
-            response.makeCustomHeaderToNet();
-            responseFuture.putResponse(response);
-            invokeCallback.operationComplete(responseFuture);
-            return null;
-        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any());
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+        SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) response.readCustomHeader();
+        responseHeader.setOffset(offset);
+        response.setCode(ResponseCode.SUCCESS);
+        response.makeCustomHeaderToNet();
+        future.complete(response);
+
+        doReturn(future).when(remotingClient).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
 
         SearchOffsetRequestHeader requestHeader = new 
SearchOffsetRequestHeader();
         requestHeader.setTopic(TOPIC);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
index ce78fa923f..6be4917457 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
@@ -17,7 +17,22 @@
 package org.apache.rocketmq.remoting;
 
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface InvokeCallback {
+    /**
+     * This method is expected to be invoked after {@link 
#operationSucceed(RemotingCommand)}
+     * or {@link #operationFail(Throwable)}
+     *
+     * @param responseFuture the returned object contains response or exception
+     */
     void operationComplete(final ResponseFuture responseFuture);
+
+    default void operationSucceed(final RemotingCommand response) {
+
+    }
+
+    default void operationFail(final Throwable throwable) {
+
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index ff0b3df95a..c8389eedb1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -20,11 +20,11 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface RemotingClient extends RemotingService {
@@ -51,18 +51,21 @@ public interface RemotingClient extends RemotingService {
         final long timeoutMillis) {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
-            invokeAsync(addr, request, timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
+            invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
                     future.complete(response);
-                } else {
-                    if (!responseFuture.isSendRequestOK()) {
-                        future.completeExceptionally(new 
RemotingSendRequestException(addr, responseFuture.getCause()));
-                    } else if (responseFuture.isTimeout()) {
-                        future.completeExceptionally(new 
RemotingTimeoutException(addr, timeoutMillis, responseFuture.getCause()));
-                    } else {
-                        future.completeExceptionally(new 
RemotingException(request.toString(), responseFuture.getCause()));
-                    }
+                }
+
+                @Override
+                public void operationFail(Throwable throwable) {
+                    future.completeExceptionally(throwable);
                 }
             });
         } catch (Throwable t) {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index fce2de267f..12e66f913c 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -23,20 +23,23 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.opentelemetry.api.common.AttributesBuilder;
-import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.rocketmq.common.AbortProcessException;
@@ -125,7 +128,7 @@ public abstract class NettyRemotingAbstract {
      * Constructor, specifying capacity of one-way and asynchronous semaphores.
      *
      * @param permitsOneway Number of permits for one-way requests.
-     * @param permitsAsync Number of permits for asynchronous requests.
+     * @param permitsAsync  Number of permits for asynchronous requests.
      */
     public NettyRemotingAbstract(final int permitsOneway, final int 
permitsAsync) {
         this.semaphoreOneway = new Semaphore(permitsOneway, true);
@@ -367,8 +370,7 @@ public abstract class NettyRemotingAbstract {
                 responseFuture.release();
             }
         } else {
-            log.warn("receive response, but not matched any request, " + 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-            log.warn(cmd.toString());
+            log.warn("receive response, cmd={}, but not matched any request, 
address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
         }
     }
 
@@ -467,57 +469,68 @@ public abstract class NettyRemotingAbstract {
     public RemotingCommand invokeSyncImpl(final Channel channel, final 
RemotingCommand request,
         final long timeoutMillis)
         throws InterruptedException, RemotingSendRequestException, 
RemotingTimeoutException {
-        //get the request id
-        final int opaque = request.getOpaque();
-
         try {
-            final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, timeoutMillis, null, null);
-            this.responseTable.put(opaque, responseFuture);
-            final SocketAddress addr = channel.remoteAddress();
-            channel.writeAndFlush(request).addListener((ChannelFutureListener) 
f -> {
-                if (f.isSuccess()) {
-                    responseFuture.setSendRequestOK(true);
-                    return;
-                }
-
-                responseFuture.setSendRequestOK(false);
-                responseTable.remove(opaque);
-                responseFuture.setCause(f.cause());
-                responseFuture.putResponse(null);
-                log.warn("Failed to write a request command to {}, caused by 
underlying I/O operation failure", addr);
-            });
+            return invokeImpl(channel, request, 
timeoutMillis).thenApply(ResponseFuture::getResponseCommand)
+                .get(timeoutMillis, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw new 
RemotingSendRequestException(channel.remoteAddress().toString(), e.getCause());
+        } catch (TimeoutException e) {
+            throw new 
RemotingTimeoutException(channel.remoteAddress().toString(), timeoutMillis, 
e.getCause());
+        }
+    }
 
-            RemotingCommand responseCommand = 
responseFuture.waitResponse(timeoutMillis);
-            if (null == responseCommand) {
-                if (responseFuture.isSendRequestOK()) {
-                    throw new 
RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), 
timeoutMillis,
-                        responseFuture.getCause());
-                } else {
-                    throw new 
RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), 
responseFuture.getCause());
-                }
+    public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, 
final RemotingCommand request,
+        final long timeoutMillis) {
+        String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
+        doBeforeRpcHooks(channelRemoteAddr, request);
+        return invoke0(channel, request, timeoutMillis).whenComplete((v, t) -> 
{
+            if (t == null) {
+                doAfterRpcHooks(channelRemoteAddr, request, 
v.getResponseCommand());
             }
-
-            return responseCommand;
-        } finally {
-            this.responseTable.remove(opaque);
-        }
+        });
     }
 
-    public void invokeAsyncImpl(final Channel channel, final RemotingCommand 
request, final long timeoutMillis,
-        final InvokeCallback invokeCallback)
-        throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
+    protected CompletableFuture<ResponseFuture> invoke0(final Channel channel, 
final RemotingCommand request,
+        final long timeoutMillis) {
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
         long beginStartTime = System.currentTimeMillis();
         final int opaque = request.getOpaque();
-        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, 
TimeUnit.MILLISECONDS);
+
+        boolean acquired;
+        try {
+            acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, 
TimeUnit.MILLISECONDS);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+            return future;
+        }
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new 
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
             long costTime = System.currentTimeMillis() - beginStartTime;
             if (timeoutMillis < costTime) {
                 once.release();
-                throw new RemotingTimeoutException("invokeAsyncImpl call 
timeout");
+                future.completeExceptionally(new 
RemotingTimeoutException("invokeAsyncImpl call timeout"));
+                return future;
             }
 
-            final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, timeoutMillis - costTime, invokeCallback, once);
+            AtomicReference<ResponseFuture> responseFutureReference = new 
AtomicReference<>();
+            final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, request, timeoutMillis - costTime,
+                new InvokeCallback() {
+                    @Override
+                    public void operationComplete(ResponseFuture 
responseFuture) {
+
+                    }
+
+                    @Override
+                    public void operationSucceed(RemotingCommand response) {
+                        future.complete(responseFutureReference.get());
+                    }
+
+                    @Override
+                    public void operationFail(Throwable throwable) {
+                        future.completeExceptionally(throwable);
+                    }
+                }, once);
+            responseFutureReference.set(responseFuture);
             this.responseTable.put(opaque, responseFuture);
             try {
                 
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
@@ -528,15 +541,17 @@ public abstract class NettyRemotingAbstract {
                     requestFail(opaque);
                     log.warn("send a request command to channel <{}> failed.", 
RemotingHelper.parseChannelRemoteAddr(channel));
                 });
+                return future;
             } catch (Exception e) {
                 responseTable.remove(opaque);
                 responseFuture.release();
                 log.warn("send a request command to channel <" + 
RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
-                throw new 
RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
+                future.completeExceptionally(new 
RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), 
e));
+                return future;
             }
         } else {
             if (timeoutMillis <= 0) {
-                throw new RemotingTooMuchRequestException("invokeAsyncImpl 
invoke too fast");
+                future.completeExceptionally(new 
RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"));
             } else {
                 String info =
                     String.format("invokeAsyncImpl tryAcquire semaphore 
timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
@@ -545,11 +560,31 @@ public abstract class NettyRemotingAbstract {
                         this.semaphoreAsync.availablePermits()
                     );
                 log.warn(info);
-                throw new RemotingTimeoutException(info);
+                future.completeExceptionally(new 
RemotingTimeoutException(info));
             }
+            return future;
         }
     }
 
+    public void invokeAsyncImpl(final Channel channel, final RemotingCommand 
request, final long timeoutMillis,
+        final InvokeCallback invokeCallback) {
+        invokeImpl(channel, request, timeoutMillis)
+            .whenComplete((v, t) -> {
+                if (t == null) {
+                    invokeCallback.operationComplete(v);
+                } else {
+                    ResponseFuture responseFuture = new 
ResponseFuture(channel, request.getOpaque(), request, timeoutMillis, null, 
null);
+                    responseFuture.setCause(t);
+                    invokeCallback.operationComplete(responseFuture);
+                }
+            })
+            .thenAccept(responseFuture -> 
invokeCallback.operationSucceed(responseFuture.getResponseCommand()))
+            .exceptionally(t -> {
+                invokeCallback.operationFail(t);
+                return null;
+            });
+    }
+
     private void requestFail(final int opaque) {
         ResponseFuture responseFuture = responseTable.remove(opaque);
         if (responseFuture != null) {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 64621dd6c4..d784351a5f 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -527,15 +527,13 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         if (channel != null && channel.isActive()) {
             long left = timeoutMillis;
             try {
-                doBeforeRpcHooks(channelRemoteAddr, request);
                 long costTime = System.currentTimeMillis() - beginStartTime;
                 left -= costTime;
                 if (left <= 0) {
                     throw new RemotingTimeoutException("invokeSync call the 
addr[" + channelRemoteAddr + "] timeout");
                 }
                 RemotingCommand response = this.invokeSyncImpl(channel, 
request, left);
-                doAfterRpcHooks(channelRemoteAddr, request, response);
-                this.updateChannelLastResponseTime(addr);
+                updateChannelLastResponseTime(addr);
                 return response;
             } catch (RemotingSendRequestException e) {
                 LOGGER.warn("invokeSync: send request exception, so close the 
channel[{}]", channelRemoteAddr);
@@ -727,18 +725,11 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         final Channel channel = this.getAndCreateChannel(addr);
         String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
         if (channel != null && channel.isActive()) {
-            try {
-                doBeforeRpcHooks(channelRemoteAddr, request);
-                long costTime = System.currentTimeMillis() - beginStartTime;
-                if (timeoutMillis < costTime) {
-                    throw new RemotingTooMuchRequestException("invokeAsync 
call the addr[" + channelRemoteAddr + "] timeout");
-                }
-                this.invokeAsyncImpl(channel, request, timeoutMillis - 
costTime, new InvokeCallbackWrapper(invokeCallback, addr));
-            } catch (RemotingSendRequestException e) {
-                LOGGER.warn("invokeAsync: send request exception, so close the 
channel[{}]", channelRemoteAddr);
-                this.closeChannel(addr, channel);
-                throw e;
+            long costTime = System.currentTimeMillis() - beginStartTime;
+            if (timeoutMillis < costTime) {
+                throw new RemotingTooMuchRequestException("invokeAsync call 
the addr[" + channelRemoteAddr + "] timeout");
             }
+            this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, 
new InvokeCallbackWrapper(invokeCallback, addr));
         } else {
             this.closeChannel(addr, channel);
             throw new RemotingConnectException(addr);
@@ -931,11 +922,19 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
 
         @Override
         public void operationComplete(ResponseFuture responseFuture) {
-            if (responseFuture != null && responseFuture.isSendRequestOK() && 
responseFuture.getResponseCommand() != null) {
-                NettyRemotingClient.this.updateChannelLastResponseTime(addr);
-            }
             this.invokeCallback.operationComplete(responseFuture);
         }
+
+        @Override
+        public void operationSucceed(RemotingCommand response) {
+            updateChannelLastResponseTime(addr);
+            this.invokeCallback.operationSucceed(response);
+        }
+
+        @Override
+        public void operationFail(final Throwable throwable) {
+            this.invokeCallback.operationFail(throwable);
+        }
     }
 
     class NettyClientHandler extends 
SimpleChannelInboundHandler<RemotingCommand> {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 19f705d74b..0882818fea 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ResponseFuture {
@@ -59,6 +62,18 @@ public class ResponseFuture {
     public void executeInvokeCallback() {
         if (invokeCallback != null) {
             if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
+                RemotingCommand response = getResponseCommand();
+                if (response != null) {
+                    invokeCallback.operationSucceed(response);
+                } else {
+                    if (!isSendRequestOK()) {
+                        invokeCallback.operationFail(new 
RemotingSendRequestException(channel.remoteAddress().toString(), getCause()));
+                    } else if (isTimeout()) {
+                        invokeCallback.operationFail(new 
RemotingTimeoutException(channel.remoteAddress().toString(), 
getTimeoutMillis(), getCause()));
+                    } else {
+                        invokeCallback.operationFail(new 
RemotingException(getRequestCommand().toString(), getCause()));
+                    }
+                }
                 invokeCallback.operationComplete(this);
             }
         }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
index 133e0ed314..5328e8845d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
@@ -160,31 +160,38 @@ public class RpcClientImpl implements RpcClient {
         InvokeCallback callback = new InvokeCallback() {
             @Override
             public void operationComplete(ResponseFuture responseFuture) {
-                RemotingCommand responseCommand = 
responseFuture.getResponseCommand();
-                if (responseCommand == null) {
-                    processFailedResponse(addr, requestCommand, 
responseFuture, rpcResponsePromise);
-                    return;
-                }
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
                 try {
-                    switch (responseCommand.getCode()) {
+                    switch (response.getCode()) {
                         case ResponseCode.SUCCESS:
                         case ResponseCode.PULL_NOT_FOUND:
                         case ResponseCode.PULL_RETRY_IMMEDIATELY:
                         case ResponseCode.PULL_OFFSET_MOVED:
                             PullMessageResponseHeader responseHeader =
-                                    (PullMessageResponseHeader) 
responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-                            rpcResponsePromise.setSuccess(new 
RpcResponse(responseCommand.getCode(), responseHeader, 
responseCommand.getBody()));
+                                (PullMessageResponseHeader) 
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+                            rpcResponsePromise.setSuccess(new 
RpcResponse(response.getCode(), responseHeader, response.getBody()));
                         default:
-                            RpcResponse rpcResponse = new RpcResponse(new 
RpcException(responseCommand.getCode(), "unexpected remote response code"));
+                            RpcResponse rpcResponse = new RpcResponse(new 
RpcException(response.getCode(), "unexpected remote response code"));
                             rpcResponsePromise.setSuccess(rpcResponse);
 
                     }
                 } catch (Exception e) {
-                    String errorMessage = "process failed. addr: " + addr + ", 
timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + 
requestCommand;
-                    RpcResponse  rpcResponse = new RpcResponse(new 
RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
+                    String errorMessage = "process failed. addr: " + addr + ", 
timeoutMillis: " + timeoutMillis + ". Request: " + requestCommand;
+                    RpcResponse rpcResponse = new RpcResponse(new 
RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
                     rpcResponsePromise.setSuccess(rpcResponse);
                 }
             }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+                String errorMessage = "process failed. addr: " + addr + ". 
Request: " + requestCommand;
+                RpcResponse rpcResponse = new RpcResponse(new 
RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, throwable));
+                rpcResponsePromise.setSuccess(rpcResponse);
+            }
         };
 
         this.remotingClient.invokeAsync(addr, requestCommand, timeoutMillis, 
callback);
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java 
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index 90072960b5..d0da0eb2ef 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -26,12 +26,12 @@ import 
org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.junit.AfterClass;
@@ -40,7 +40,6 @@ import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class RemotingServerTest {
     private static RemotingServer remotingServer;
@@ -122,10 +121,19 @@ public class RemotingServerTest {
         remotingClient.invokeAsync("localhost:" + 
remotingServer.localListenPort(), request, 1000 * 3, new InvokeCallback() {
             @Override
             public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
                 latch.countDown();
-                assertTrue(responseFuture != null);
-                
assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
-                
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
+                
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
+                assertThat(response.getExtFields()).hasSize(2);
+            }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+
             }
         });
         latch.await();
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java
similarity index 57%
rename from 
client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
rename to 
remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java
index 80188832eb..8ddcdf35df 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java
@@ -15,23 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.impl;
+package org.apache.rocketmq.remoting.netty;
 
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.netty.ResponseFuture;
-
-public abstract class BaseInvokeCallback implements InvokeCallback {
-    private final MQClientAPIImpl mqClientAPI;
-
-    public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) {
-        this.mqClientAPI = mqClientAPI;
-    }
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.local.LocalChannel;
 
+public class MockChannel extends LocalChannel {
     @Override
-    public void operationComplete(final ResponseFuture responseFuture) {
-        mqClientAPI.execRpcHooksAfterRequest(responseFuture);
-        onComplete(responseFuture);
+    public ChannelFuture writeAndFlush(Object msg) {
+        return new MockChannelPromise(MockChannel.this);
     }
-
-    public abstract void onComplete(final ResponseFuture responseFuture);
 }
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java
new file mode 100644
index 0000000000..9c3a354871
--- /dev/null
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.jetbrains.annotations.NotNull;
+
+public class MockChannelPromise implements ChannelPromise {
+    protected Channel channel;
+
+    public MockChannelPromise(Channel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public Channel channel() {
+        return channel;
+    }
+
+    @Override
+    public ChannelPromise setSuccess(Void result) {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise setSuccess() {
+        return this;
+    }
+
+    @Override
+    public boolean trySuccess() {
+        return false;
+    }
+
+    @Override
+    public ChannelPromise setFailure(Throwable cause) {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise addListener(GenericFutureListener<? extends Future<? 
super Void>> listener) {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise addListeners(GenericFutureListener<? extends 
Future<? super Void>>... listeners) {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise removeListener(GenericFutureListener<? extends 
Future<? super Void>> listener) {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise removeListeners(GenericFutureListener<? extends 
Future<? super Void>>... listeners) {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise sync() throws InterruptedException {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise syncUninterruptibly() {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise await() throws InterruptedException {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise awaitUninterruptibly() {
+        return this;
+    }
+
+    @Override
+    public ChannelPromise unvoid() {
+        return this;
+    }
+
+    @Override
+    public boolean isVoid() {
+        return false;
+    }
+
+    @Override
+    public boolean trySuccess(Void result) {
+        return false;
+    }
+
+    @Override
+    public boolean tryFailure(Throwable cause) {
+        return false;
+    }
+
+    @Override
+    public boolean setUncancellable() {
+        return false;
+    }
+
+    @Override
+    public boolean isSuccess() {
+        return false;
+    }
+
+    @Override
+    public boolean isCancellable() {
+        return false;
+    }
+
+    @Override
+    public Throwable cause() {
+        return null;
+    }
+
+    @Override
+    public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return false;
+    }
+
+    @Override
+    public boolean await(long timeoutMillis) throws InterruptedException {
+        return false;
+    }
+
+    @Override
+    public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
+        return false;
+    }
+
+    @Override
+    public boolean awaitUninterruptibly(long timeoutMillis) {
+        return false;
+    }
+
+    @Override
+    public Void getNow() {
+        return null;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return false;
+    }
+
+    @Override
+    public Void get() throws InterruptedException, ExecutionException {
+        return null;
+    }
+
+    @Override
+    public Void get(long timeout,
+        @NotNull java.util.concurrent.TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+        return null;
+    }
+}
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index 8381c132b7..dbbea86ea2 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -39,9 +39,19 @@ public class NettyRemotingAbstractTest {
         final Semaphore semaphore = new Semaphore(0);
         ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new 
InvokeCallback() {
             @Override
-            public void operationComplete(final ResponseFuture responseFuture) 
{
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
                 assertThat(semaphore.availablePermits()).isEqualTo(0);
             }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+
+            }
         }, new SemaphoreReleaseOnlyOnce(semaphore));
 
         remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -75,9 +85,19 @@ public class NettyRemotingAbstractTest {
         final Semaphore semaphore = new Semaphore(0);
         ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new 
InvokeCallback() {
             @Override
-            public void operationComplete(final ResponseFuture responseFuture) 
{
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
                 assertThat(semaphore.availablePermits()).isEqualTo(0);
             }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+
+            }
         }, new SemaphoreReleaseOnlyOnce(semaphore));
 
         remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -98,7 +118,18 @@ public class NettyRemotingAbstractTest {
         // mock timeout
         ResponseFuture responseFuture = new ResponseFuture(null, dummyId, 
-1000, new InvokeCallback() {
             @Override
-            public void operationComplete(final ResponseFuture responseFuture) 
{
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+
+            @Override
+            public void operationSucceed(RemotingCommand response) {
+
+            }
+
+            @Override
+            public void operationFail(Throwable throwable) {
+
             }
         }, null);
         remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture);
@@ -111,7 +142,22 @@ public class NettyRemotingAbstractTest {
         final Semaphore semaphore = new Semaphore(0);
         RemotingCommand request = RemotingCommand.createRequestCommand(1, 
null);
         ResponseFuture responseFuture = new ResponseFuture(null, 1, request, 
3000,
-            responseFuture1 -> 
assertThat(semaphore.availablePermits()).isEqualTo(0), new 
SemaphoreReleaseOnlyOnce(semaphore));
+            new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+
+                }
+
+                @Override
+                public void operationSucceed(RemotingCommand response) {
+                    assertThat(semaphore.availablePermits()).isEqualTo(0);
+                }
+
+                @Override
+                public void operationFail(Throwable throwable) {
+
+                }
+            }, new SemaphoreReleaseOnlyOnce(semaphore));
 
         remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
         RemotingCommand response = RemotingCommand.createResponseCommand(0, 
"Foo");
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 8fabbb21d0..e72e7bd53e 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -16,10 +16,17 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.local.LocalChannel;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -29,23 +36,33 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
 public class NettyRemotingClientTest {
     @Spy
     private NettyRemotingClient remotingClient = new NettyRemotingClient(new 
NettyClientConfig());
+    @Mock
+    private RPCHook rpcHookMock;
 
     @Test
-    public void testSetCallbackExecutor() throws NoSuchFieldException, 
IllegalAccessException {        
+    public void testSetCallbackExecutor() {
         ExecutorService customized = Executors.newCachedThreadPool();
         remotingClient.setCallbackExecutor(customized);
         assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
@@ -61,7 +78,7 @@ public class NettyRemotingClientTest {
             InvokeCallback callback = invocation.getArgument(3);
             ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
             responseFuture.setResponseCommand(response);
-            callback.operationComplete(responseFuture);
+            callback.operationSucceed(responseFuture.getResponseCommand());
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
 
@@ -78,9 +95,7 @@ public class NettyRemotingClientTest {
         response.setCode(ResponseCode.SUCCESS);
         doAnswer(invocation -> {
             InvokeCallback callback = invocation.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-            responseFuture.setSendRequestOK(false);
-            callback.operationComplete(responseFuture);
+            callback.operationFail(new RemotingSendRequestException(null));
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
 
@@ -97,8 +112,7 @@ public class NettyRemotingClientTest {
         response.setCode(ResponseCode.SUCCESS);
         doAnswer(invocation -> {
             InvokeCallback callback = invocation.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), -1L, null, null);
-            callback.operationComplete(responseFuture);
+            callback.operationFail(new RemotingTimeoutException(""));
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
 
@@ -115,8 +129,7 @@ public class NettyRemotingClientTest {
         response.setCode(ResponseCode.SUCCESS);
         doAnswer(invocation -> {
             InvokeCallback callback = invocation.getArgument(3);
-            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-            callback.operationComplete(responseFuture);
+            callback.operationFail(new RemotingException(null));
             return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
 
@@ -134,4 +147,158 @@ public class NettyRemotingClientTest {
             assertThat(e.getMessage()).contains(addr);
         }
     }
+
+    @Test
+    public void testInvoke0() throws ExecutionException, InterruptedException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        Channel channel = new MockChannel() {
+            @Override
+            public ChannelFuture writeAndFlush(Object msg) {
+                ResponseFuture responseFuture = 
remotingClient.responseTable.get(request.getOpaque());
+                responseFuture.setResponseCommand(response);
+                responseFuture.executeInvokeCallback();
+                return super.writeAndFlush(msg);
+            }
+        };
+        CompletableFuture<ResponseFuture> future = 
remotingClient.invoke0(channel, request, 1000L);
+        assertThat(future.get().getResponseCommand()).isEqualTo(response);
+    }
+
+    @Test
+    public void testInvoke0WithException() {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        Channel channel = new MockChannel() {
+            @Override
+            public ChannelFuture writeAndFlush(Object msg) {
+                ResponseFuture responseFuture = 
remotingClient.responseTable.get(request.getOpaque());
+                responseFuture.executeInvokeCallback();
+                return super.writeAndFlush(msg);
+            }
+        };
+        CompletableFuture<ResponseFuture> future = 
remotingClient.invoke0(channel, request, 1000L);
+        
assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingException.class);
+    }
+
+    @Test
+    public void testInvokeSync() throws RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
+        remotingClient.registerRPCHook(rpcHookMock);
+
+        Channel channel = new LocalChannel();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        ResponseFuture responseFuture = new ResponseFuture(channel, 
request.getOpaque(), request, 1000, new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+        }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+        responseFuture.setResponseCommand(response);
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        future.complete(responseFuture);
+
+        doReturn(future).when(remotingClient).invoke0(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+        RemotingCommand actual = remotingClient.invokeSyncImpl(channel, 
request, 1000);
+        assertThat(actual).isEqualTo(response);
+
+        verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+        verify(rpcHookMock).doAfterResponse(anyString(), eq(request), 
eq(response));
+    }
+
+    @Test
+    public void testInvokeAsync() {
+        remotingClient.registerRPCHook(rpcHookMock);
+        Channel channel = new LocalChannel();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        ResponseFuture responseFuture = new ResponseFuture(channel, 
request.getOpaque(), request, 1000, new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+        }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+        responseFuture.setResponseCommand(response);
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        future.complete(responseFuture);
+
+        doReturn(future).when(remotingClient).invoke0(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+
+        InvokeCallback callback = mock(InvokeCallback.class);
+        remotingClient.invokeAsyncImpl(channel, request, 1000, callback);
+        verify(callback, times(1)).operationSucceed(eq(response));
+        verify(callback, times(1)).operationComplete(eq(responseFuture));
+        verify(callback, never()).operationFail(any());
+
+        verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+        verify(rpcHookMock).doAfterResponse(anyString(), eq(request), 
eq(response));
+    }
+
+    @Test
+    public void testInvokeAsyncFail() {
+        remotingClient.registerRPCHook(rpcHookMock);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+        Channel channel = new LocalChannel();
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        future.completeExceptionally(new RemotingException(null));
+
+        doReturn(future).when(remotingClient).invoke0(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+
+        InvokeCallback callback = mock(InvokeCallback.class);
+        remotingClient.invokeAsyncImpl(channel, request, 1000, callback);
+        verify(callback, never()).operationSucceed(any());
+        verify(callback, times(1)).operationComplete(any());
+        verify(callback, times(1)).operationFail(any());
+
+        verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+        verify(rpcHookMock, never()).doAfterResponse(anyString(), eq(request), 
any());
+    }
+
+    @Test
+    public void testInvokeImpl() throws ExecutionException, 
InterruptedException {
+        remotingClient.registerRPCHook(rpcHookMock);
+        Channel channel = new LocalChannel();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        ResponseFuture responseFuture = new ResponseFuture(channel, 
request.getOpaque(), request, 1000, new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+
+            }
+        }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+        responseFuture.setResponseCommand(response);
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        future.complete(responseFuture);
+
+        doReturn(future).when(remotingClient).invoke0(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+
+        CompletableFuture<ResponseFuture> future0 = 
remotingClient.invokeImpl(channel, request, 1000);
+        assertThat(future0.get()).isEqualTo(responseFuture);
+
+        verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+        verify(rpcHookMock).doAfterResponse(anyString(), eq(request), 
eq(response));
+    }
+
+    @Test
+    public void testInvokeImplFail() {
+        remotingClient.registerRPCHook(rpcHookMock);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+        Channel channel = new LocalChannel();
+        CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+        future.completeExceptionally(new RemotingException(null));
+
+        doReturn(future).when(remotingClient).invoke0(any(Channel.class), 
any(RemotingCommand.class), anyLong());
+
+        assertThatThrownBy(() -> remotingClient.invokeImpl(channel, request, 
1000).get()).getCause().isInstanceOf(RemotingException.class);
+
+        verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+        verify(rpcHookMock, never()).doAfterResponse(anyString(), eq(request), 
any());
+    }
 }

Reply via email to