This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 66ba4566f5 close channel when receive go away twice (#8862) 66ba4566f5 is described below commit 66ba4566f5ebaeac47c73eaaf4a86567e3760063 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Thu Nov 14 14:07:20 2024 +0800 close channel when receive go away twice (#8862) close channel when receive go away twice (#8862) --- .../rocketmq/remoting/common/RemotingHelper.java | 30 ++++++++---- .../rocketmq/remoting/netty/NettyClientConfig.java | 10 ---- .../remoting/netty/NettyRemotingAbstract.java | 2 +- .../remoting/netty/NettyRemotingClient.java | 53 ++++++++-------------- 4 files changed, 41 insertions(+), 54 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java index 552fd2b15f..d94efe71e4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java @@ -21,6 +21,15 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.NetworkUtil; @@ -36,15 +45,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.Map; - public class RemotingHelper { public static final String DEFAULT_CHARSET = "UTF-8"; public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0"; @@ -355,6 +355,18 @@ public class RemotingHelper { } } + public static CompletableFuture<Void> convertChannelFutureToCompletableFuture(ChannelFuture channelFuture) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + channelFuture.addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + completableFuture.complete(null); + } else { + completableFuture.completeExceptionally(new RemotingConnectException(channelFuture.channel().remoteAddress().toString(), future.cause())); + } + }); + return completableFuture; + } + public static String getRequestCodeDesc(int code) { return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code)); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java index 7b7263e27a..8260163640 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java @@ -59,8 +59,6 @@ public class NettyClientConfig { private boolean enableReconnectForGoAway = true; - private boolean enableTransparentRetry = true; - public boolean isClientCloseSocketIfTimeout() { return clientCloseSocketIfTimeout; } @@ -205,14 +203,6 @@ public class NettyClientConfig { this.enableReconnectForGoAway = enableReconnectForGoAway; } - public boolean isEnableTransparentRetry() { - return enableTransparentRetry; - } - - public void setEnableTransparentRetry(boolean enableTransparentRetry) { - this.enableTransparentRetry = enableTransparentRetry; - } - public String getSocksProxyConfig() { return socksProxyConfig; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index ffa3726059..b0c7099b9d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -273,7 +273,7 @@ public abstract class NettyRemotingAbstract { Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque); if (isShuttingDown.get()) { - if (cmd.getVersion() > MQVersion.Version.V5_1_4.ordinal()) { + if (cmd.getVersion() > MQVersion.Version.V5_3_1.ordinal()) { final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY, "please go away"); response.setOpaque(opaque); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index ae82b09eda..b3042c9f8d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -73,6 +73,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -88,6 +89,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.proxy.SocksProxyConfig; +import static org.apache.rocketmq.remoting.common.RemotingHelper.convertChannelFutureToCompletableFuture; + public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); @@ -554,7 +557,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti updateChannelLastResponseTime(addr); return response; } catch (RemotingSendRequestException e) { - LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", channelRemoteAddr); + LOGGER.warn("invokeSync: send request exception, so close the channel[addr={}, id={}]", channelRemoteAddr, channel.id()); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { @@ -832,45 +835,27 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelWrapper0; }); if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) { - if (nettyClientConfig.isEnableTransparentRetry()) { - RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); - retryRequest.setBody(request.getBody()); - retryRequest.setExtFields(request.getExtFields()); - if (channelWrapper.isOK()) { - long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); - stopwatch.stop(); - Channel retryChannel = channelWrapper.getChannel(); - if (retryChannel != null && channel != retryChannel) { - return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); - } - } else { - CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); - ChannelFuture channelFuture = channelWrapper.getChannelFuture(); - channelFuture.addListener(f -> { - long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); - stopwatch.stop(); - if (f.isSuccess()) { - Channel retryChannel0 = channelFuture.channel(); - if (retryChannel0 != null && channel != retryChannel0) { - super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> { - if (t != null) { - future.completeExceptionally(t); - } else { - future.complete(v); - } - }); - } - } else { - future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress)); + RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); + retryRequest.setBody(request.getBody()); + retryRequest.setExtFields(request.getExtFields()); + CompletableFuture<Void> future = convertChannelFutureToCompletableFuture(channelWrapper.getChannelFuture()); + return future.thenCompose(v -> { + long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); + stopwatch.stop(); + return super.invokeImpl(channelWrapper.getChannel(), retryRequest, timeoutMillis - duration) + .thenCompose(r -> { + if (r.getResponseCommand().getCode() == ResponseCode.GO_AWAY) { + return FutureUtils.completeExceptionally(new RemotingSendRequestException(channelRemoteAddr, + new Throwable("Receive GO_AWAY twice in request from channelId=" + channel.id()))); } + return CompletableFuture.completedFuture(r); }); - return future; - } - } + }); } else { LOGGER.warn("invokeImpl receive GO_AWAY, channelWrapper is null or channel is the same in wrapper, channelId={}", channel.id()); } } + return FutureUtils.completeExceptionally(new RemotingSendRequestException(channelRemoteAddr, new Throwable("Receive GO_AWAY from channelId=" + channel.id()))); } return CompletableFuture.completedFuture(responseFuture); }).whenComplete((v, t) -> {