This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 84156084a4 [ISSUE #7321] Refector NettyRemotingAbstract with unify future implementation (#7322) 84156084a4 is described below commit 84156084a4c5228e1d2fe21e068fff330bbc40d1 Author: Zhouxiang Zhan <zhouxz...@apache.org> AuthorDate: Sun Oct 8 11:13:25 2023 +0800 [ISSUE #7321] Refector NettyRemotingAbstract with unify future implementation (#7322) * Refector NettyRemotingAbstract * Add invoke with future method * Deprecate InvokeCallback#operationComplete * Add operationSuccess and operationException for InvokeCallback * fix unit test * fix unit test * Keep InvokeCallback#operationComplete * Optimize invokeAsyncImpl operationComplete * Add unit test for NettyRemotingClient * fix checkstyle --- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 147 ++++++---- .../apache/rocketmq/client/impl/MQAdminImpl.java | 71 ++--- .../rocketmq/client/impl/MQClientAPIImpl.java | 239 ++++++++-------- .../client/impl/mqclient/MQClientAPIExt.java | 309 +++++++++------------ .../rocketmq/client/impl/MQClientAPIImplTest.java | 12 +- .../proxy/remoting/RemotingProtocolServer.java | 22 +- .../proxy/service/mqclient/MQClientAPIExtTest.java | 97 +++---- .../apache/rocketmq/remoting/InvokeCallback.java | 15 + .../apache/rocketmq/remoting/RemotingClient.java | 27 +- .../remoting/netty/NettyRemotingAbstract.java | 123 +++++--- .../remoting/netty/NettyRemotingClient.java | 33 ++- .../rocketmq/remoting/netty/ResponseFuture.java | 15 + .../rocketmq/remoting/rpc/RpcClientImpl.java | 29 +- .../rocketmq/remoting/RemotingServerTest.java | 22 +- .../rocketmq/remoting/netty/MockChannel.java | 21 +- .../remoting/netty/MockChannelPromise.java | 191 +++++++++++++ .../remoting/netty/NettyRemotingAbstractTest.java | 54 +++- .../remoting/netty/NettyRemotingClientTest.java | 185 +++++++++++- 18 files changed, 1029 insertions(+), 583 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 9dfb8127d6..6fde48dd99 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -73,6 +73,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo; import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -107,6 +108,8 @@ import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader; @@ -124,8 +127,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerReques import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult; import org.apache.rocketmq.remoting.protocol.route.BrokerData; @@ -151,7 +152,6 @@ public class BrokerOuterAPI { private final RpcClient rpcClient; private String nameSrvAddr = null; - public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new ClientMetadata()); } @@ -459,7 +459,7 @@ public class BrokerOuterAPI { * @param filterServerList * @param oneway * @param timeoutMills - * @param compressed default false + * @param compressed default false * @return */ public List<RegisterBrokerResult> registerBrokerAll( @@ -643,7 +643,6 @@ public class BrokerOuterAPI { queueDatas.add(queueData); final byte[] topicRouteBody = topicRouteData.encode(); - List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { @@ -910,25 +909,33 @@ public class BrokerOuterAPI { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); request.setBody(requestBody.encode()); - this.remotingClient.invokeAsync(addr, request, timeoutMillis, responseFuture -> { - if (callback == null) { - return; + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + } - try { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - if (response.getCode() == ResponseCode.SUCCESS) { - LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), - LockBatchResponseBody.class); - Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); - callback.onSuccess(messageQueues); - } else { - callback.onException(new MQBrokerException(response.getCode(), response.getRemark())); - } + @Override + public void operationSucceed(RemotingCommand response) { + if (callback == null) { + return; } - } catch (Throwable ignored) { + if (response.getCode() == ResponseCode.SUCCESS) { + LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), + LockBatchResponseBody.class); + Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); + callback.onSuccess(messageQueues); + } else { + callback.onException(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + @Override + public void operationFail(Throwable throwable) { + if (callback == null) { + return; + } + callback.onException(throwable); } }); } @@ -942,22 +949,30 @@ public class BrokerOuterAPI { request.setBody(requestBody.encode()); - this.remotingClient.invokeAsync(addr, request, timeoutMillis, responseFuture -> { - if (callback == null) { - return; + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + } - try { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - if (response.getCode() == ResponseCode.SUCCESS) { - callback.onSuccess(); - } else { - callback.onException(new MQBrokerException(response.getCode(), response.getRemark())); - } + @Override + public void operationSucceed(RemotingCommand response) { + if (callback == null) { + return; } - } catch (Throwable ignored) { + if (response.getCode() == ResponseCode.SUCCESS) { + callback.onSuccess(); + } else { + callback.onException(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + @Override + public void operationFail(Throwable throwable) { + if (callback == null) { + return; + } + callback.onException(throwable); } }); } @@ -983,21 +998,27 @@ public class BrokerOuterAPI { CompletableFuture<SendResult> cf = new CompletableFuture<>(); final String msgId = msg.getMsgId(); try { - this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (null != response) { - SendResult sendResult = null; + this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { try { - sendResult = this.processSendResponse(brokerName, msg, response); + SendResult sendResult = processSendResponse(brokerName, msg, response); cf.complete(sendResult); } catch (MQBrokerException | RemotingCommandException e) { LOGGER.error("processSendResponse in sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e); cf.completeExceptionally(e); } - } else { - cf.complete(null); } + @Override + public void operationFail(Throwable throwable) { + cf.completeExceptionally(throwable); + } }); } catch (Throwable t) { LOGGER.error("invokeAsync failed in sendMessageToSpecificBrokerAsync, msgId=" + msgId, t); @@ -1057,7 +1078,7 @@ public class BrokerOuterAPI { } if (sendStatus != null) { SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); + (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); //If namespace not null , reset Topic without namespace. String topic = msg.getTopic(); @@ -1073,8 +1094,8 @@ public class BrokerOuterAPI { uniqMsgId = sb.toString(); } SendResult sendResult = new SendResult(sendStatus, - uniqMsgId, - responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); + uniqMsgId, + responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); @@ -1218,8 +1239,9 @@ public class BrokerOuterAPI { /** * Broker try to elect itself as a master in broker set */ - public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName, String brokerName, - Long brokerId) throws Exception { + public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String controllerAddress, String clusterName, + String brokerName, + Long brokerId) throws Exception { final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader); @@ -1237,7 +1259,8 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, final String controllerAddress) throws Exception { + public GetNextBrokerIdResponseHeader getNextBrokerId(final String clusterName, final String brokerName, + final String controllerAddress) throws Exception { final GetNextBrokerIdRequestHeader requestHeader = new GetNextBrokerIdRequestHeader(clusterName, brokerName); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); @@ -1248,7 +1271,8 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception { + public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName, final String brokerName, + final Long brokerId, final String registerCheckCode, final String controllerAddress) throws Exception { final ApplyBrokerIdRequestHeader requestHeader = new ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId, registerCheckCode); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_APPLY_BROKER_ID, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); @@ -1259,7 +1283,9 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController(final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, final String controllerAddress) throws Exception { + public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>> registerBrokerToController( + final String clusterName, final String brokerName, final Long brokerId, final String brokerAddress, + final String controllerAddress) throws Exception { final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId, brokerAddress); final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader); final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); @@ -1355,16 +1381,25 @@ public class BrokerOuterAPI { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>(); - this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - if (responseFuture.getCause() != null) { - pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>())); - return; + this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + } - try { - PullResultExt pullResultExt = this.processPullResponse(responseFuture.getResponseCommand(), brokerAddr); - this.processPullResult(pullResultExt, brokerName, queueId); - pullResultFuture.complete(pullResultExt); - } catch (Exception e) { + + @Override + public void operationSucceed(RemotingCommand response) { + try { + PullResultExt pullResultExt = processPullResponse(response, brokerAddr); + processPullResult(pullResultExt, brokerName, queueId); + pullResultFuture.complete(pullResultExt); + } catch (Exception e) { + pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>())); + } + } + + @Override + public void operationFail(Throwable throwable) { pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>())); } }); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 1ef3a94835..83835bd3d3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -44,6 +44,8 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -55,8 +57,6 @@ import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class MQAdminImpl { @@ -357,44 +357,51 @@ public class MQAdminImpl { new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { try { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - QueryMessageResponseHeader responseHeader = null; - try { - responseHeader = - (QueryMessageResponseHeader) response - .decodeCommandCustomHeader(QueryMessageResponseHeader.class); - } catch (RemotingCommandException e) { - log.error("decodeCommandCustomHeader exception", e); - return; - } - - List<MessageExt> wrappers = - MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); - - QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); - try { - lock.writeLock().lock(); - queryResultList.add(qr); - } finally { - lock.writeLock().unlock(); - } - break; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + QueryMessageResponseHeader responseHeader = null; + try { + responseHeader = + (QueryMessageResponseHeader) response + .decodeCommandCustomHeader(QueryMessageResponseHeader.class); + } catch (RemotingCommandException e) { + log.error("decodeCommandCustomHeader exception", e); + return; } - default: - log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); - break; + + List<MessageExt> wrappers = + MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); + + QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); + try { + lock.writeLock().lock(); + queryResultList.add(qr); + } finally { + lock.writeLock().unlock(); + } + break; } - } else { - log.warn("getResponseCommand return null"); + default: + log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); + break; } + } finally { countDownLatch.countDown(); } } + + @Override + public void operationFail(Throwable throwable) { + log.error("queryMessage error, requestHeader={}", requestHeader); + countDownLatch.countDown(); + } }, isUniqKey); } catch (Exception e) { log.warn("queryMessage exception", e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 3201a493f7..2407e57373 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.Validators; -import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.consumer.AckCallback; import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; @@ -653,10 +652,13 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { - long cost = System.currentTimeMillis() - beginStartTime; - RemotingCommand response = responseFuture.getResponseCommand(); - if (null == sendCallback && response != null) { + } + + @Override + public void operationSucceed(RemotingCommand response) { + long cost = System.currentTimeMillis() - beginStartTime; + if (null == sendCallback) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); if (context != null && sendResult != null) { @@ -666,46 +668,47 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { } catch (Throwable e) { } - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true); + producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true); return; } - if (response != null) { + try { + SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); + assert sendResult != null; + if (context != null) { + context.setSendResult(sendResult); + context.getProducer().executeSendMessageHookAfter(context); + } + try { - SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); - assert sendResult != null; - if (context != null) { - context.setSendResult(sendResult); - context.getProducer().executeSendMessageHookAfter(context); - } + sendCallback.onSuccess(sendResult); + } catch (Throwable e) { + } - try { - sendCallback.onSuccess(sendResult); - } catch (Throwable e) { - } + producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true); + } catch (Exception e) { + producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, e, context, false, producer); + } + } - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true); - } catch (Exception e) { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, e, context, false, producer); - } + @Override + public void operationFail(Throwable throwable) { + producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true); + long cost = System.currentTimeMillis() - beginStartTime; + if (throwable instanceof RemotingSendRequestException) { + MQClientException ex = new MQClientException("send request failed", throwable); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); + } else if (throwable instanceof RemotingTimeoutException) { + MQClientException ex = new MQClientException("wait response timeout, cost=" + cost, throwable); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); } else { - producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true); - if (!responseFuture.isSendRequestOK()) { - MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } else if (responseFuture.isTimeout()) { - MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause()); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } else { - MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); - onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); - } + MQClientException ex = new MQClientException("unknow reseaon", throwable); + onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, + retryTimesWhenSendFailed, times, ex, context, true, producer); } } }); @@ -857,30 +860,25 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { final long timeoutMillis, final PopCallback popCallback ) throws RemotingException, InterruptedException { final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader); - this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) { + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override - public void onComplete(ResponseFuture responseFuture) { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - PopResult - popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader); - assert popResult != null; - popCallback.onSuccess(popResult); - } catch (Exception e) { - popCallback.onException(e); - } - } else { - if (!responseFuture.isSendRequestOK()) { - popCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); - } else if (responseFuture.isTimeout()) { - popCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, - responseFuture.getCause())); - } else { - popCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); - } + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { + try { + PopResult popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader); + popCallback.onSuccess(popResult); + } catch (Exception e) { + popCallback.onException(e); } } + @Override + public void operationFail(Throwable throwable) { + popCallback.onException(throwable); + } }); } @@ -959,34 +957,26 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { request.setBody(requestBody.encode()); } } - this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) { + this.remotingClient.invokeAsync(addr, request, timeOut, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } @Override - public void onComplete(ResponseFuture responseFuture) { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - AckResult ackResult = new AckResult(); - if (ResponseCode.SUCCESS == response.getCode()) { - ackResult.setStatus(AckStatus.OK); - } else { - ackResult.setStatus(AckStatus.NO_EXIST); - } - ackCallback.onSuccess(ackResult); - } catch (Exception e) { - ackCallback.onException(e); - } + public void operationSucceed(RemotingCommand response) { + AckResult ackResult = new AckResult(); + if (ResponseCode.SUCCESS == response.getCode()) { + ackResult.setStatus(AckStatus.OK); } else { - if (!responseFuture.isSendRequestOK()) { - ackCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); - } else if (responseFuture.isTimeout()) { - ackCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, - responseFuture.getCause())); - } else { - ackCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeOut + ". Request: " + request, responseFuture.getCause())); - } + ackResult.setStatus(AckStatus.NO_EXIST); } + ackCallback.onSuccess(ackResult); + } + @Override + public void operationFail(Throwable throwable) { + ackCallback.onException(throwable); } }); } @@ -999,39 +989,37 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { final AckCallback ackCallback ) throws RemotingException, MQBrokerException, InterruptedException { final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader); - this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) { + this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override - public void onComplete(ResponseFuture responseFuture) { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - ChangeInvisibleTimeResponseHeader responseHeader = (ChangeInvisibleTimeResponseHeader) response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class); - AckResult ackResult = new AckResult(); - if (ResponseCode.SUCCESS == response.getCode()) { - ackResult.setStatus(AckStatus.OK); - ackResult.setPopTime(responseHeader.getPopTime()); - ackResult.setExtraInfo(ExtraInfoUtil - .buildExtraInfo(requestHeader.getOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), - responseHeader.getReviveQid(), requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + MessageConst.KEY_SEPARATOR - + requestHeader.getOffset()); - } else { - ackResult.setStatus(AckStatus.NO_EXIST); - } - ackCallback.onSuccess(ackResult); - } catch (Exception e) { - ackCallback.onException(e); - } - } else { - if (!responseFuture.isSendRequestOK()) { - ackCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); - } else if (responseFuture.isTimeout()) { - ackCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, - responseFuture.getCause())); + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { + try { + ChangeInvisibleTimeResponseHeader responseHeader = (ChangeInvisibleTimeResponseHeader) response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class); + AckResult ackResult = new AckResult(); + if (ResponseCode.SUCCESS == response.getCode()) { + ackResult.setStatus(AckStatus.OK); + ackResult.setPopTime(responseHeader.getPopTime()); + ackResult.setExtraInfo(ExtraInfoUtil + .buildExtraInfo(requestHeader.getOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), + responseHeader.getReviveQid(), requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + MessageConst.KEY_SEPARATOR + + requestHeader.getOffset()); } else { - ackCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); + ackResult.setStatus(AckStatus.NO_EXIST); } + ackCallback.onSuccess(ackResult); + } catch (Exception e) { + ackCallback.onException(e); } } + + @Override + public void operationFail(Throwable throwable) { + ackCallback.onException(throwable); + } }); } @@ -1044,26 +1032,23 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); - assert pullResult != null; - pullCallback.onSuccess(pullResult); - } catch (Exception e) { - pullCallback.onException(e); - } - } else { - if (!responseFuture.isSendRequestOK()) { - pullCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); - } else if (responseFuture.isTimeout()) { - pullCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, - responseFuture.getCause())); - } else { - pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); - } + + } + + @Override + public void operationSucceed(RemotingCommand response) { + try { + PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); + pullCallback.onSuccess(pullResult); + } catch (Exception e) { + pullCallback.onException(e); } } + + @Override + public void operationFail(Throwable throwable) { + pullCallback.onException(throwable); + } }); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index d7c8ef8d92..f3102e1759 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -30,7 +30,6 @@ import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.OffsetNotFoundException; import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.CommunicationMode; @@ -47,6 +46,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; @@ -106,19 +106,6 @@ public class MQClientAPIExt extends MQClientAPIImpl { return false; } - protected static MQClientException processNullResponseErr(ResponseFuture responseFuture) { - MQClientException ex; - if (!responseFuture.isSendRequestOK()) { - ex = new MQClientException("send request failed", responseFuture.getCause()); - } else if (responseFuture.isTimeout()) { - ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause()); - } else { - ex = new MQClientException("unknown reason", responseFuture.getCause()); - } - return ex; - } - public CompletableFuture<Void> sendHeartbeatOneway( String brokerAddr, HeartbeatData heartbeatData, @@ -146,24 +133,15 @@ public class MQClientAPIExt extends MQClientAPIImpl { request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); - CompletableFuture<Integer> future = new CompletableFuture<>(); - try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - if (ResponseCode.SUCCESS == response.getCode()) { - future.complete(response.getVersion()); - } else { - future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr)); - } - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); - } - }); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<Integer> future0 = new CompletableFuture<>(); + if (ResponseCode.SUCCESS == response.getCode()) { + future0.complete(response.getVersion()); + } else { + future0.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr)); + } + return future0; + }); } public CompletableFuture<SendResult> sendMessageAsync( @@ -177,24 +155,15 @@ public class MQClientAPIExt extends MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); request.setBody(msg.getBody()); - CompletableFuture<SendResult> future = new CompletableFuture<>(); - try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - future.complete(this.processSendResponse(brokerName, msg, response, brokerAddr)); - } catch (Exception e) { - future.completeExceptionally(e); - } - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); - } - }); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<SendResult> future0 = new CompletableFuture<>(); + try { + future0.complete(this.processSendResponse(brokerName, msg, response, brokerAddr)); + } catch (Exception e) { + future0.completeExceptionally(e); + } + return future0; + }); } public CompletableFuture<SendResult> sendMessageAsync( @@ -216,17 +185,14 @@ public class MQClientAPIExt extends MQClientAPIImpl { msgBatch.setBody(body); request.setBody(body); - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - try { - future.complete(this.processSendResponse(brokerName, msgBatch, response, brokerAddr)); - } catch (Exception e) { - future.completeExceptionally(e); - } - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<SendResult> future0 = new CompletableFuture<>(); + try { + future0.complete(processSendResponse(brokerName, msgBatch, response, brokerAddr)); + } catch (Exception e) { + future0.completeExceptionally(e); } + return future0; }); } catch (Throwable t) { future.completeExceptionally(t); @@ -240,21 +206,7 @@ public class MQClientAPIExt extends MQClientAPIImpl { long timeoutMillis ) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); - - CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); - try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - future.complete(response); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); - } - }); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis); } public CompletableFuture<PopResult> popMessageAsync( @@ -402,38 +354,31 @@ public class MQClientAPIExt extends MQClientAPIImpl { QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis ) { - CompletableFuture<Long> future = new CompletableFuture<>(); - try { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - try { - QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); - future.complete(responseHeader.getOffset()); - } catch (RemotingCommandException e) { - future.completeExceptionally(e); - } - break; - } - case ResponseCode.QUERY_NOT_FOUND: { - future.completeExceptionally(new OffsetNotFoundException(response.getCode(), response.getRemark(), brokerAddr)); - break; - } - default: - break; + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<Long> future0 = new CompletableFuture<>(); + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + try { + QueryConsumerOffsetResponseHeader responseHeader = + (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); + future0.complete(responseHeader.getOffset()); + } catch (RemotingCommandException e) { + future0.completeExceptionally(e); } - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + break; } - }); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; + case ResponseCode.QUERY_NOT_FOUND: { + future0.completeExceptionally(new OffsetNotFoundException(response.getCode(), response.getRemark(), brokerAddr)); + break; + } + default: { + future0.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); + break; + } + } + return future0; + }); } public CompletableFuture<Void> updateConsumerOffsetOneWay( @@ -461,9 +406,14 @@ public class MQClientAPIExt extends MQClientAPIImpl { CompletableFuture<List<String>> future = new CompletableFuture<>(); try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { + this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { @@ -485,8 +435,11 @@ public class MQClientAPIExt extends MQClientAPIImpl { break; } future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + } + + @Override + public void operationFail(Throwable throwable) { + future.completeExceptionally(throwable); } }); } catch (Throwable t) { @@ -501,9 +454,14 @@ public class MQClientAPIExt extends MQClientAPIImpl { CompletableFuture<Long> future = new CompletableFuture<>(); try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { + this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { if (ResponseCode.SUCCESS == response.getCode()) { try { GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); @@ -513,8 +471,11 @@ public class MQClientAPIExt extends MQClientAPIImpl { } } future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + } + + @Override + public void operationFail(Throwable throwable) { + future.completeExceptionally(throwable); } }); } catch (Throwable t) { @@ -529,9 +490,14 @@ public class MQClientAPIExt extends MQClientAPIImpl { CompletableFuture<Long> future = new CompletableFuture<>(); try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { + this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { if (ResponseCode.SUCCESS == response.getCode()) { try { GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); @@ -541,8 +507,11 @@ public class MQClientAPIExt extends MQClientAPIImpl { } } future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + } + + @Override + public void operationFail(Throwable throwable) { + future.completeExceptionally(throwable); } }); } catch (Throwable t) { @@ -555,57 +524,41 @@ public class MQClientAPIExt extends MQClientAPIImpl { long timeoutMillis) { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); - CompletableFuture<Long> future = new CompletableFuture<>(); - try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - if (response.getCode() == ResponseCode.SUCCESS) { - try { - SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); - future.complete(responseHeader.getOffset()); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<Long> future0 = new CompletableFuture<>(); + if (response.getCode() == ResponseCode.SUCCESS) { + try { + SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); + future0.complete(responseHeader.getOffset()); + } catch (Throwable t) { + future0.completeExceptionally(t); } - }); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; + } else { + future0.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); + } + return future0; + }); } public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String brokerAddr, LockBatchRequestBody requestBody, long timeoutMillis) { - CompletableFuture<Set<MessageQueue>> future = new CompletableFuture<>(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); request.setBody(requestBody.encode()); - try { - this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - if (response.getCode() == ResponseCode.SUCCESS) { - try { - LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class); - Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); - future.complete(messageQueues); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); - } else { - future.completeExceptionally(processNullResponseErr(responseFuture)); + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<Set<MessageQueue>> future0 = new CompletableFuture<>(); + if (response.getCode() == ResponseCode.SUCCESS) { + try { + LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class); + Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet(); + future0.complete(messageQueues); + } catch (Throwable t) { + future0.completeExceptionally(t); } - }); - } catch (Exception e) { - future.completeExceptionally(e); - } - return future; + } else { + future0.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); + } + return future0; + }); } public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr, @@ -624,25 +577,21 @@ public class MQClientAPIExt extends MQClientAPIImpl { public CompletableFuture<Boolean> notification(String brokerAddr, NotificationRequestHeader requestHeader, long timeoutMillis) { - CompletableFuture<Boolean> future = new CompletableFuture<>(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader); - try { - this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenAccept(response -> { - if (response.getCode() == ResponseCode.SUCCESS) { - try { - NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.decodeCommandCustomHeader(NotificationResponseHeader.class); - future.complete(responseHeader.isHasMsg()); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } else { - future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); + return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { + CompletableFuture<Boolean> future0 = new CompletableFuture<>(); + if (response.getCode() == ResponseCode.SUCCESS) { + try { + NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.decodeCommandCustomHeader(NotificationResponseHeader.class); + future0.complete(responseHeader.isHasMsg()); + } catch (Throwable t) { + future0.completeExceptionally(t); } - }); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; + } else { + future0.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); + } + return future0; + }); } public CompletableFuture<RemotingCommand> invoke(String brokerAddr, RemotingCommand request, long timeoutMillis) { diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index d13f2cfe43..c152d38ea5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -212,7 +212,7 @@ public class MQClientAPIImplTest { RemotingCommand request = mock.getArgument(1); ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(createSendMessageSuccessResponse(request)); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -386,7 +386,7 @@ public class MQClientAPIImplTest { RemotingCommand request = mock.getArgument(1); ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(createSendMessageSuccessResponse(request)); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; } }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class)); @@ -472,7 +472,7 @@ public class MQClientAPIImplTest { message.putUserProperty("key", "value"); response.setBody(MessageDecoder.encode(message, false)); responseFuture.setResponseCommand(response); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -543,7 +543,7 @@ public class MQClientAPIImplTest { message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, String.valueOf(0)); response.setBody(MessageDecoder.encode(message, false)); responseFuture.setResponseCommand(response); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -585,7 +585,7 @@ public class MQClientAPIImplTest { response.setOpaque(request.getOpaque()); response.setCode(ResponseCode.SUCCESS); responseFuture.setResponseCommand(response); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -622,7 +622,7 @@ public class MQClientAPIImplTest { responseHeader.setPopTime(System.currentTimeMillis()); responseHeader.setInvisibleTime(10 * 1000L); responseFuture.setResponseCommand(response); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; } }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index fe07090d50..3227d1e1c6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; -import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; @@ -51,10 +50,12 @@ import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; @@ -239,12 +240,21 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu long timeoutMillis) { CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { - this.defaultRemotingServer.invokeAsync(channel, request, timeoutMillis, responseFuture -> { - if (responseFuture.getResponseCommand() == null) { - future.completeExceptionally(new MQClientException("response is null after send request to client", responseFuture.getCause())); - return; + this.defaultRemotingServer.invokeAsync(channel, request, timeoutMillis, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { + future.complete(response); + } + + @Override + public void operationFail(Throwable throwable) { + future.completeExceptionally(throwable); } - future.complete(responseFuture.getResponseCommand()); }); } catch (Throwable t) { future.completeExceptionally(t); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java index 3f3a4ae40c..e2d05b0f5a 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -85,6 +86,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; @RunWith(MockitoJUnitRunner.class) public class MQClientAPIExtTest { @@ -109,13 +111,9 @@ public class MQClientAPIExtTest { @Test public void testSendHeartbeatAsync() throws Exception { - doAnswer((Answer<Void>) mock -> { - InvokeCallback invokeCallback = mock.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, invokeCallback, null); - responseFuture.putResponse(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "")); - invokeCallback.operationComplete(responseFuture); - return null; - }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "")); + doReturn(future).when(remotingClient).invoke(anyString(), any(RemotingCommand.class), anyLong()); assertNotNull(mqClientAPI.sendHeartbeatAsync(BROKER_ADDR, new HeartbeatData(), TIMEOUT).get()); } @@ -123,20 +121,16 @@ public class MQClientAPIExtTest { @Test public void testSendMessageAsync() throws Exception { AtomicReference<String> msgIdRef = new AtomicReference<>(); - doAnswer((Answer<Void>) mock -> { - InvokeCallback invokeCallback = mock.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, invokeCallback, null); - RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); - SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.readCustomHeader(); - sendMessageResponseHeader.setMsgId(msgIdRef.get()); - sendMessageResponseHeader.setQueueId(0); - sendMessageResponseHeader.setQueueOffset(1L); - response.setCode(ResponseCode.SUCCESS); - response.makeCustomHeaderToNet(); - responseFuture.putResponse(response); - invokeCallback.operationComplete(responseFuture); - return null; - }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + sendMessageResponseHeader.setMsgId(msgIdRef.get()); + sendMessageResponseHeader.setQueueId(0); + sendMessageResponseHeader.setQueueOffset(1L); + response.setCode(ResponseCode.SUCCESS); + response.makeCustomHeaderToNet(); + future.complete(response); + doReturn(future).when(remotingClient).invoke(anyString(), any(RemotingCommand.class), anyLong()); MessageExt messageExt = createMessage(); msgIdRef.set(MessageClientIDSetter.getUniqID(messageExt)); @@ -150,20 +144,16 @@ public class MQClientAPIExtTest { @Test public void testSendMessageListAsync() throws Exception { - doAnswer((Answer<Void>) mock -> { - InvokeCallback invokeCallback = mock.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, invokeCallback, null); - RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); - SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.readCustomHeader(); - sendMessageResponseHeader.setMsgId(""); - sendMessageResponseHeader.setQueueId(0); - sendMessageResponseHeader.setQueueOffset(1L); - response.setCode(ResponseCode.SUCCESS); - response.makeCustomHeaderToNet(); - responseFuture.putResponse(response); - invokeCallback.operationComplete(responseFuture); - return null; - }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + sendMessageResponseHeader.setMsgId(""); + sendMessageResponseHeader.setQueueId(0); + sendMessageResponseHeader.setQueueOffset(1L); + response.setCode(ResponseCode.SUCCESS); + response.makeCustomHeaderToNet(); + future.complete(response); + doReturn(future).when(remotingClient).invoke(anyString(), any(RemotingCommand.class), anyLong()); List<MessageExt> messageExtList = new ArrayList<>(); StringBuilder sb = new StringBuilder(); @@ -182,13 +172,9 @@ public class MQClientAPIExtTest { @Test public void testSendMessageBackAsync() throws Exception { - doAnswer((Answer<Void>) mock -> { - InvokeCallback invokeCallback = mock.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, invokeCallback, null); - responseFuture.putResponse(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "")); - invokeCallback.operationComplete(responseFuture); - return null; - }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "")); + doReturn(future).when(remotingClient).invoke(anyString(), any(RemotingCommand.class), anyLong()); RemotingCommand remotingCommand = mqClientAPI.sendMessageBackAsync(BROKER_ADDR, new ConsumerSendMsgBackRequestHeader(), TIMEOUT) .get(); @@ -285,7 +271,7 @@ public class MQClientAPIExtTest { body.setConsumerIdList(clientIds); response.setBody(body.encode()); responseFuture.putResponse(response); - invokeCallback.operationComplete(responseFuture); + invokeCallback.operationSucceed(responseFuture.getResponseCommand()); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); @@ -302,7 +288,7 @@ public class MQClientAPIExtTest { response.setCode(ResponseCode.SYSTEM_ERROR); response.makeCustomHeaderToNet(); responseFuture.putResponse(response); - invokeCallback.operationComplete(responseFuture); + invokeCallback.operationSucceed(responseFuture.getResponseCommand()); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); @@ -322,7 +308,7 @@ public class MQClientAPIExtTest { response.setCode(ResponseCode.SUCCESS); response.makeCustomHeaderToNet(); responseFuture.putResponse(response); - invokeCallback.operationComplete(responseFuture); + invokeCallback.operationSucceed(responseFuture.getResponseCommand()); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); @@ -335,18 +321,15 @@ public class MQClientAPIExtTest { @Test public void testSearchOffsetAsync() throws Exception { long offset = ThreadLocalRandom.current().nextLong(); - doAnswer((Answer<Void>) mock -> { - InvokeCallback invokeCallback = mock.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000, invokeCallback, null); - RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); - SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); - responseHeader.setOffset(offset); - response.setCode(ResponseCode.SUCCESS); - response.makeCustomHeaderToNet(); - responseFuture.putResponse(response); - invokeCallback.operationComplete(responseFuture); - return null; - }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any()); + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); + SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.makeCustomHeaderToNet(); + future.complete(response); + + doReturn(future).when(remotingClient).invoke(anyString(), any(RemotingCommand.class), anyLong()); SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(TOPIC); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java index ce78fa923f..6be4917457 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java @@ -17,7 +17,22 @@ package org.apache.rocketmq.remoting; import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface InvokeCallback { + /** + * This method is expected to be invoked after {@link #operationSucceed(RemotingCommand)} + * or {@link #operationFail(Throwable)} + * + * @param responseFuture the returned object contains response or exception + */ void operationComplete(final ResponseFuture responseFuture); + + default void operationSucceed(final RemotingCommand response) { + + } + + default void operationFail(final Throwable throwable) { + + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index ff0b3df95a..c8389eedb1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -20,11 +20,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface RemotingClient extends RemotingService { @@ -51,18 +51,21 @@ public interface RemotingClient extends RemotingService { final long timeoutMillis) { CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { - invokeAsync(addr, request, timeoutMillis, responseFuture -> { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { + invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { + + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { future.complete(response); - } else { - if (!responseFuture.isSendRequestOK()) { - future.completeExceptionally(new RemotingSendRequestException(addr, responseFuture.getCause())); - } else if (responseFuture.isTimeout()) { - future.completeExceptionally(new RemotingTimeoutException(addr, timeoutMillis, responseFuture.getCause())); - } else { - future.completeExceptionally(new RemotingException(request.toString(), responseFuture.getCause())); - } + } + + @Override + public void operationFail(Throwable throwable) { + future.completeExceptionally(throwable); } }); } catch (Throwable t) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index fce2de267f..12e66f913c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -23,20 +23,23 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.opentelemetry.api.common.AttributesBuilder; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.rocketmq.common.AbortProcessException; @@ -125,7 +128,7 @@ public abstract class NettyRemotingAbstract { * Constructor, specifying capacity of one-way and asynchronous semaphores. * * @param permitsOneway Number of permits for one-way requests. - * @param permitsAsync Number of permits for asynchronous requests. + * @param permitsAsync Number of permits for asynchronous requests. */ public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { this.semaphoreOneway = new Semaphore(permitsOneway, true); @@ -367,8 +370,7 @@ public abstract class NettyRemotingAbstract { responseFuture.release(); } } else { - log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - log.warn(cmd.toString()); + log.warn("receive response, cmd={}, but not matched any request, address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } } @@ -467,57 +469,68 @@ public abstract class NettyRemotingAbstract { public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { - //get the request id - final int opaque = request.getOpaque(); - try { - final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); - this.responseTable.put(opaque, responseFuture); - final SocketAddress addr = channel.remoteAddress(); - channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> { - if (f.isSuccess()) { - responseFuture.setSendRequestOK(true); - return; - } - - responseFuture.setSendRequestOK(false); - responseTable.remove(opaque); - responseFuture.setCause(f.cause()); - responseFuture.putResponse(null); - log.warn("Failed to write a request command to {}, caused by underlying I/O operation failure", addr); - }); + return invokeImpl(channel, request, timeoutMillis).thenApply(ResponseFuture::getResponseCommand) + .get(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw new RemotingSendRequestException(channel.remoteAddress().toString(), e.getCause()); + } catch (TimeoutException e) { + throw new RemotingTimeoutException(channel.remoteAddress().toString(), timeoutMillis, e.getCause()); + } + } - RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); - if (null == responseCommand) { - if (responseFuture.isSendRequestOK()) { - throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, - responseFuture.getCause()); - } else { - throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); - } + public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final RemotingCommand request, + final long timeoutMillis) { + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + doBeforeRpcHooks(channelRemoteAddr, request); + return invoke0(channel, request, timeoutMillis).whenComplete((v, t) -> { + if (t == null) { + doAfterRpcHooks(channelRemoteAddr, request, v.getResponseCommand()); } - - return responseCommand; - } finally { - this.responseTable.remove(opaque); - } + }); } - public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, - final InvokeCallback invokeCallback) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + protected CompletableFuture<ResponseFuture> invoke0(final Channel channel, final RemotingCommand request, + final long timeoutMillis) { + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); - boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + + boolean acquired; + try { + acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + future.completeExceptionally(t); + return future; + } if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); - throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); + future.completeExceptionally(new RemotingTimeoutException("invokeAsyncImpl call timeout")); + return future; } - final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); + AtomicReference<ResponseFuture> responseFutureReference = new AtomicReference<>(); + final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, request, timeoutMillis - costTime, + new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { + future.complete(responseFutureReference.get()); + } + + @Override + public void operationFail(Throwable throwable) { + future.completeExceptionally(throwable); + } + }, once); + responseFutureReference.set(responseFuture); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> { @@ -528,15 +541,17 @@ public abstract class NettyRemotingAbstract { requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); }); + return future; } catch (Exception e) { responseTable.remove(opaque); responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); - throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); + future.completeExceptionally(new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e)); + return future; } } else { if (timeoutMillis <= 0) { - throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); + future.completeExceptionally(new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast")); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", @@ -545,11 +560,31 @@ public abstract class NettyRemotingAbstract { this.semaphoreAsync.availablePermits() ); log.warn(info); - throw new RemotingTimeoutException(info); + future.completeExceptionally(new RemotingTimeoutException(info)); } + return future; } } + public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, + final InvokeCallback invokeCallback) { + invokeImpl(channel, request, timeoutMillis) + .whenComplete((v, t) -> { + if (t == null) { + invokeCallback.operationComplete(v); + } else { + ResponseFuture responseFuture = new ResponseFuture(channel, request.getOpaque(), request, timeoutMillis, null, null); + responseFuture.setCause(t); + invokeCallback.operationComplete(responseFuture); + } + }) + .thenAccept(responseFuture -> invokeCallback.operationSucceed(responseFuture.getResponseCommand())) + .exceptionally(t -> { + invokeCallback.operationFail(t); + return null; + }); + } + private void requestFail(final int opaque) { ResponseFuture responseFuture = responseTable.remove(opaque); if (responseFuture != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 64621dd6c4..d784351a5f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -527,15 +527,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti if (channel != null && channel.isActive()) { long left = timeoutMillis; try { - doBeforeRpcHooks(channelRemoteAddr, request); long costTime = System.currentTimeMillis() - beginStartTime; left -= costTime; if (left <= 0) { throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout"); } RemotingCommand response = this.invokeSyncImpl(channel, request, left); - doAfterRpcHooks(channelRemoteAddr, request, response); - this.updateChannelLastResponseTime(addr); + updateChannelLastResponseTime(addr); return response; } catch (RemotingSendRequestException e) { LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", channelRemoteAddr); @@ -727,18 +725,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); if (channel != null && channel.isActive()) { - try { - doBeforeRpcHooks(channelRemoteAddr, request); - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeoutMillis < costTime) { - throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"); - } - this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); - } catch (RemotingSendRequestException e) { - LOGGER.warn("invokeAsync: send request exception, so close the channel[{}]", channelRemoteAddr); - this.closeChannel(addr, channel); - throw e; + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeoutMillis < costTime) { + throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"); } + this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); @@ -931,11 +922,19 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void operationComplete(ResponseFuture responseFuture) { - if (responseFuture != null && responseFuture.isSendRequestOK() && responseFuture.getResponseCommand() != null) { - NettyRemotingClient.this.updateChannelLastResponseTime(addr); - } this.invokeCallback.operationComplete(responseFuture); } + + @Override + public void operationSucceed(RemotingCommand response) { + updateChannelLastResponseTime(addr); + this.invokeCallback.operationSucceed(response); + } + + @Override + public void operationFail(final Throwable throwable) { + this.invokeCallback.operationFail(throwable); + } } class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 19f705d74b..0882818fea 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class ResponseFuture { @@ -59,6 +62,18 @@ public class ResponseFuture { public void executeInvokeCallback() { if (invokeCallback != null) { if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { + RemotingCommand response = getResponseCommand(); + if (response != null) { + invokeCallback.operationSucceed(response); + } else { + if (!isSendRequestOK()) { + invokeCallback.operationFail(new RemotingSendRequestException(channel.remoteAddress().toString(), getCause())); + } else if (isTimeout()) { + invokeCallback.operationFail(new RemotingTimeoutException(channel.remoteAddress().toString(), getTimeoutMillis(), getCause())); + } else { + invokeCallback.operationFail(new RemotingException(getRequestCommand().toString(), getCause())); + } + } invokeCallback.operationComplete(this); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java index 133e0ed314..5328e8845d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java @@ -160,31 +160,38 @@ public class RpcClientImpl implements RpcClient { InvokeCallback callback = new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { - RemotingCommand responseCommand = responseFuture.getResponseCommand(); - if (responseCommand == null) { - processFailedResponse(addr, requestCommand, responseFuture, rpcResponsePromise); - return; - } + + } + + @Override + public void operationSucceed(RemotingCommand response) { try { - switch (responseCommand.getCode()) { + switch (response.getCode()) { case ResponseCode.SUCCESS: case ResponseCode.PULL_NOT_FOUND: case ResponseCode.PULL_RETRY_IMMEDIATELY: case ResponseCode.PULL_OFFSET_MOVED: PullMessageResponseHeader responseHeader = - (PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class); - rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); + rpcResponsePromise.setSuccess(new RpcResponse(response.getCode(), responseHeader, response.getBody())); default: - RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code")); + RpcResponse rpcResponse = new RpcResponse(new RpcException(response.getCode(), "unexpected remote response code")); rpcResponsePromise.setSuccess(rpcResponse); } } catch (Exception e) { - String errorMessage = "process failed. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand; - RpcResponse rpcResponse = new RpcResponse(new RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e)); + String errorMessage = "process failed. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + requestCommand; + RpcResponse rpcResponse = new RpcResponse(new RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e)); rpcResponsePromise.setSuccess(rpcResponse); } } + + @Override + public void operationFail(Throwable throwable) { + String errorMessage = "process failed. addr: " + addr + ". Request: " + requestCommand; + RpcResponse rpcResponse = new RpcResponse(new RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, throwable)); + rpcResponsePromise.setSuccess(rpcResponse); + } }; this.remotingClient.invokeAsync(addr, requestCommand, timeoutMillis, callback); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java index 90072960b5..d0da0eb2ef 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java @@ -26,12 +26,12 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.netty.ResponseFuture; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.AfterClass; @@ -40,7 +40,6 @@ import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class RemotingServerTest { private static RemotingServer remotingServer; @@ -122,10 +121,19 @@ public class RemotingServerTest { remotingClient.invokeAsync("localhost:" + remotingServer.localListenPort(), request, 1000 * 3, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { latch.countDown(); - assertTrue(responseFuture != null); - assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA); - assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2); + assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA); + assertThat(response.getExtFields()).hasSize(2); + } + + @Override + public void operationFail(Throwable throwable) { + } }); latch.await(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java similarity index 57% rename from client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java rename to remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java index 80188832eb..8ddcdf35df 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java @@ -15,23 +15,14 @@ * limitations under the License. */ -package org.apache.rocketmq.client.impl; +package org.apache.rocketmq.remoting.netty; -import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.netty.ResponseFuture; - -public abstract class BaseInvokeCallback implements InvokeCallback { - private final MQClientAPIImpl mqClientAPI; - - public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) { - this.mqClientAPI = mqClientAPI; - } +import io.netty.channel.ChannelFuture; +import io.netty.channel.local.LocalChannel; +public class MockChannel extends LocalChannel { @Override - public void operationComplete(final ResponseFuture responseFuture) { - mqClientAPI.execRpcHooksAfterRequest(responseFuture); - onComplete(responseFuture); + public ChannelFuture writeAndFlush(Object msg) { + return new MockChannelPromise(MockChannel.this); } - - public abstract void onComplete(final ResponseFuture responseFuture); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java new file mode 100644 index 0000000000..9c3a354871 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.jetbrains.annotations.NotNull; + +public class MockChannelPromise implements ChannelPromise { + protected Channel channel; + + public MockChannelPromise(Channel channel) { + this.channel = channel; + } + + @Override + public Channel channel() { + return channel; + } + + @Override + public ChannelPromise setSuccess(Void result) { + return this; + } + + @Override + public ChannelPromise setSuccess() { + return this; + } + + @Override + public boolean trySuccess() { + return false; + } + + @Override + public ChannelPromise setFailure(Throwable cause) { + return this; + } + + @Override + public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { + return this; + } + + @Override + public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { + return this; + } + + @Override + public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { + return this; + } + + @Override + public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { + return this; + } + + @Override + public ChannelPromise sync() throws InterruptedException { + return this; + } + + @Override + public ChannelPromise syncUninterruptibly() { + return this; + } + + @Override + public ChannelPromise await() throws InterruptedException { + return this; + } + + @Override + public ChannelPromise awaitUninterruptibly() { + return this; + } + + @Override + public ChannelPromise unvoid() { + return this; + } + + @Override + public boolean isVoid() { + return false; + } + + @Override + public boolean trySuccess(Void result) { + return false; + } + + @Override + public boolean tryFailure(Throwable cause) { + return false; + } + + @Override + public boolean setUncancellable() { + return false; + } + + @Override + public boolean isSuccess() { + return false; + } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return false; + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + return false; + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + return false; + } + + @Override + public Void getNow() { + return null; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + return null; + } + + @Override + public Void get(long timeout, + @NotNull java.util.concurrent.TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } +} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java index 8381c132b7..dbbea86ea2 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java @@ -39,9 +39,19 @@ public class NettyRemotingAbstractTest { final Semaphore semaphore = new Semaphore(0); ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() { @Override - public void operationComplete(final ResponseFuture responseFuture) { + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { assertThat(semaphore.availablePermits()).isEqualTo(0); } + + @Override + public void operationFail(Throwable throwable) { + + } }, new SemaphoreReleaseOnlyOnce(semaphore)); remotingAbstract.responseTable.putIfAbsent(1, responseFuture); @@ -75,9 +85,19 @@ public class NettyRemotingAbstractTest { final Semaphore semaphore = new Semaphore(0); ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() { @Override - public void operationComplete(final ResponseFuture responseFuture) { + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { assertThat(semaphore.availablePermits()).isEqualTo(0); } + + @Override + public void operationFail(Throwable throwable) { + + } }, new SemaphoreReleaseOnlyOnce(semaphore)); remotingAbstract.responseTable.putIfAbsent(1, responseFuture); @@ -98,7 +118,18 @@ public class NettyRemotingAbstractTest { // mock timeout ResponseFuture responseFuture = new ResponseFuture(null, dummyId, -1000, new InvokeCallback() { @Override - public void operationComplete(final ResponseFuture responseFuture) { + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { + + } + + @Override + public void operationFail(Throwable throwable) { + } }, null); remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture); @@ -111,7 +142,22 @@ public class NettyRemotingAbstractTest { final Semaphore semaphore = new Semaphore(0); RemotingCommand request = RemotingCommand.createRequestCommand(1, null); ResponseFuture responseFuture = new ResponseFuture(null, 1, request, 3000, - responseFuture1 -> assertThat(semaphore.availablePermits()).isEqualTo(0), new SemaphoreReleaseOnlyOnce(semaphore)); + new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + + @Override + public void operationSucceed(RemotingCommand response) { + assertThat(semaphore.availablePermits()).isEqualTo(0); + } + + @Override + public void operationFail(Throwable throwable) { + + } + }, new SemaphoreReleaseOnlyOnce(semaphore)); remotingAbstract.responseTable.putIfAbsent(1, responseFuture); RemotingCommand response = RemotingCommand.createResponseCommand(0, "Foo"); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java index 8fabbb21d0..e72e7bd53e 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java @@ -16,10 +16,17 @@ */ package org.apache.rocketmq.remoting.netty; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.local.LocalChannel; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -29,23 +36,33 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class NettyRemotingClientTest { @Spy private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig()); + @Mock + private RPCHook rpcHookMock; @Test - public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { + public void testSetCallbackExecutor() { ExecutorService customized = Executors.newCachedThreadPool(); remotingClient.setCallbackExecutor(customized); assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized); @@ -61,7 +78,7 @@ public class NettyRemotingClientTest { InvokeCallback callback = invocation.getArgument(3); ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(response); - callback.operationComplete(responseFuture); + callback.operationSucceed(responseFuture.getResponseCommand()); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -78,9 +95,7 @@ public class NettyRemotingClientTest { response.setCode(ResponseCode.SUCCESS); doAnswer(invocation -> { InvokeCallback callback = invocation.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); - responseFuture.setSendRequestOK(false); - callback.operationComplete(responseFuture); + callback.operationFail(new RemotingSendRequestException(null)); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -97,8 +112,7 @@ public class NettyRemotingClientTest { response.setCode(ResponseCode.SUCCESS); doAnswer(invocation -> { InvokeCallback callback = invocation.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), -1L, null, null); - callback.operationComplete(responseFuture); + callback.operationFail(new RemotingTimeoutException("")); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -115,8 +129,7 @@ public class NettyRemotingClientTest { response.setCode(ResponseCode.SUCCESS); doAnswer(invocation -> { InvokeCallback callback = invocation.getArgument(3); - ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); - callback.operationComplete(responseFuture); + callback.operationFail(new RemotingException(null)); return null; }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); @@ -134,4 +147,158 @@ public class NettyRemotingClientTest { assertThat(e.getMessage()).contains(addr); } } + + @Test + public void testInvoke0() throws ExecutionException, InterruptedException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + Channel channel = new MockChannel() { + @Override + public ChannelFuture writeAndFlush(Object msg) { + ResponseFuture responseFuture = remotingClient.responseTable.get(request.getOpaque()); + responseFuture.setResponseCommand(response); + responseFuture.executeInvokeCallback(); + return super.writeAndFlush(msg); + } + }; + CompletableFuture<ResponseFuture> future = remotingClient.invoke0(channel, request, 1000L); + assertThat(future.get().getResponseCommand()).isEqualTo(response); + } + + @Test + public void testInvoke0WithException() { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + Channel channel = new MockChannel() { + @Override + public ChannelFuture writeAndFlush(Object msg) { + ResponseFuture responseFuture = remotingClient.responseTable.get(request.getOpaque()); + responseFuture.executeInvokeCallback(); + return super.writeAndFlush(msg); + } + }; + CompletableFuture<ResponseFuture> future = remotingClient.invoke0(channel, request, 1000L); + assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingException.class); + } + + @Test + public void testInvokeSync() throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + remotingClient.registerRPCHook(rpcHookMock); + + Channel channel = new LocalChannel(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + ResponseFuture responseFuture = new ResponseFuture(channel, request.getOpaque(), request, 1000, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + }, new SemaphoreReleaseOnlyOnce(new Semaphore(1))); + responseFuture.setResponseCommand(response); + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + future.complete(responseFuture); + + doReturn(future).when(remotingClient).invoke0(any(Channel.class), any(RemotingCommand.class), anyLong()); + RemotingCommand actual = remotingClient.invokeSyncImpl(channel, request, 1000); + assertThat(actual).isEqualTo(response); + + verify(rpcHookMock).doBeforeRequest(anyString(), eq(request)); + verify(rpcHookMock).doAfterResponse(anyString(), eq(request), eq(response)); + } + + @Test + public void testInvokeAsync() { + remotingClient.registerRPCHook(rpcHookMock); + Channel channel = new LocalChannel(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + ResponseFuture responseFuture = new ResponseFuture(channel, request.getOpaque(), request, 1000, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + }, new SemaphoreReleaseOnlyOnce(new Semaphore(1))); + responseFuture.setResponseCommand(response); + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + future.complete(responseFuture); + + doReturn(future).when(remotingClient).invoke0(any(Channel.class), any(RemotingCommand.class), anyLong()); + + InvokeCallback callback = mock(InvokeCallback.class); + remotingClient.invokeAsyncImpl(channel, request, 1000, callback); + verify(callback, times(1)).operationSucceed(eq(response)); + verify(callback, times(1)).operationComplete(eq(responseFuture)); + verify(callback, never()).operationFail(any()); + + verify(rpcHookMock).doBeforeRequest(anyString(), eq(request)); + verify(rpcHookMock).doAfterResponse(anyString(), eq(request), eq(response)); + } + + @Test + public void testInvokeAsyncFail() { + remotingClient.registerRPCHook(rpcHookMock); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + + Channel channel = new LocalChannel(); + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new RemotingException(null)); + + doReturn(future).when(remotingClient).invoke0(any(Channel.class), any(RemotingCommand.class), anyLong()); + + InvokeCallback callback = mock(InvokeCallback.class); + remotingClient.invokeAsyncImpl(channel, request, 1000, callback); + verify(callback, never()).operationSucceed(any()); + verify(callback, times(1)).operationComplete(any()); + verify(callback, times(1)).operationFail(any()); + + verify(rpcHookMock).doBeforeRequest(anyString(), eq(request)); + verify(rpcHookMock, never()).doAfterResponse(anyString(), eq(request), any()); + } + + @Test + public void testInvokeImpl() throws ExecutionException, InterruptedException { + remotingClient.registerRPCHook(rpcHookMock); + Channel channel = new LocalChannel(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + ResponseFuture responseFuture = new ResponseFuture(channel, request.getOpaque(), request, 1000, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + + } + }, new SemaphoreReleaseOnlyOnce(new Semaphore(1))); + responseFuture.setResponseCommand(response); + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + future.complete(responseFuture); + + doReturn(future).when(remotingClient).invoke0(any(Channel.class), any(RemotingCommand.class), anyLong()); + + CompletableFuture<ResponseFuture> future0 = remotingClient.invokeImpl(channel, request, 1000); + assertThat(future0.get()).isEqualTo(responseFuture); + + verify(rpcHookMock).doBeforeRequest(anyString(), eq(request)); + verify(rpcHookMock).doAfterResponse(anyString(), eq(request), eq(response)); + } + + @Test + public void testInvokeImplFail() { + remotingClient.registerRPCHook(rpcHookMock); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + + Channel channel = new LocalChannel(); + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new RemotingException(null)); + + doReturn(future).when(remotingClient).invoke0(any(Channel.class), any(RemotingCommand.class), anyLong()); + + assertThatThrownBy(() -> remotingClient.invokeImpl(channel, request, 1000).get()).getCause().isInstanceOf(RemotingException.class); + + verify(rpcHookMock).doBeforeRequest(anyString(), eq(request)); + verify(rpcHookMock, never()).doAfterResponse(anyString(), eq(request), any()); + } }