This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 7dc0e5aed8 [ISSUE #7833] Fix invokeImpl() in RemotingAbstract 7dc0e5aed8 is described below commit 7dc0e5aed83f738f5554cb5f00e0da5dabc06e04 Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Mon Feb 19 17:33:01 2024 +0800 [ISSUE #7833] Fix invokeImpl() in RemotingAbstract --- .../org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java | 8 +------- .../org/apache/rocketmq/remoting/netty/NettyRemotingClient.java | 7 +++++++ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 62a8a72901..235349fce3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -496,13 +496,7 @@ public abstract class NettyRemotingAbstract { public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - doBeforeRpcHooks(channelRemoteAddr, request); - return invoke0(channel, request, timeoutMillis).whenComplete((v, t) -> { - if (t == null) { - doAfterRpcHooks(channelRemoteAddr, request, v.getResponseCommand()); - } - }); + return invoke0(channel, request, timeoutMillis); } protected CompletableFuture<ResponseFuture> invoke0(final Channel channel, final RemotingCommand request, diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index f5157d0304..925c4f9cb2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -804,6 +804,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { Stopwatch stopwatch = Stopwatch.createStarted(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + doBeforeRpcHooks(channelRemoteAddr, request); + return super.invokeImpl(channel, request, timeoutMillis).thenCompose(responseFuture -> { RemotingCommand response = responseFuture.getResponseCommand(); if (response.getCode() == ResponseCode.GO_AWAY) { @@ -839,6 +842,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } return CompletableFuture.completedFuture(responseFuture); + }).whenComplete((v, t) -> { + if (t == null) { + doAfterRpcHooks(channelRemoteAddr, request, v.getResponseCommand()); + } }); }