This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 66ba4566f5 close channel when receive go away twice (#8862)
66ba4566f5 is described below

commit 66ba4566f5ebaeac47c73eaaf4a86567e3760063
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Thu Nov 14 14:07:20 2024 +0800

    close channel when receive go away twice (#8862)
    
    close channel when receive go away twice (#8862)
---
 .../rocketmq/remoting/common/RemotingHelper.java   | 30 ++++++++----
 .../rocketmq/remoting/netty/NettyClientConfig.java | 10 ----
 .../remoting/netty/NettyRemotingAbstract.java      |  2 +-
 .../remoting/netty/NettyRemotingClient.java        | 53 ++++++++--------------
 4 files changed, 41 insertions(+), 54 deletions(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 552fd2b15f..d94efe71e4 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -21,6 +21,15 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.NetworkUtil;
@@ -36,15 +45,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Map;
-
 public class RemotingHelper {
     public static final String DEFAULT_CHARSET = "UTF-8";
     public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
@@ -355,6 +355,18 @@ public class RemotingHelper {
         }
     }
 
+    public static CompletableFuture<Void> 
convertChannelFutureToCompletableFuture(ChannelFuture channelFuture) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        channelFuture.addListener((ChannelFutureListener) future -> {
+            if (future.isSuccess()) {
+                completableFuture.complete(null);
+            } else {
+                completableFuture.completeExceptionally(new 
RemotingConnectException(channelFuture.channel().remoteAddress().toString(), 
future.cause()));
+            }
+        });
+        return completableFuture;
+    }
+
     public static String getRequestCodeDesc(int code) {
         return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code));
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 7b7263e27a..8260163640 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -59,8 +59,6 @@ public class NettyClientConfig {
 
     private boolean enableReconnectForGoAway = true;
 
-    private boolean enableTransparentRetry = true;
-
     public boolean isClientCloseSocketIfTimeout() {
         return clientCloseSocketIfTimeout;
     }
@@ -205,14 +203,6 @@ public class NettyClientConfig {
         this.enableReconnectForGoAway = enableReconnectForGoAway;
     }
 
-    public boolean isEnableTransparentRetry() {
-        return enableTransparentRetry;
-    }
-
-    public void setEnableTransparentRetry(boolean enableTransparentRetry) {
-        this.enableTransparentRetry = enableTransparentRetry;
-    }
-
     public String getSocksProxyConfig() {
         return socksProxyConfig;
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index ffa3726059..b0c7099b9d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -273,7 +273,7 @@ public abstract class NettyRemotingAbstract {
         Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);
 
         if (isShuttingDown.get()) {
-            if (cmd.getVersion() > MQVersion.Version.V5_1_4.ordinal()) {
+            if (cmd.getVersion() > MQVersion.Version.V5_3_1.ordinal()) {
                 final RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY,
                     "please go away");
                 response.setOpaque(opaque);
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index ae82b09eda..b3042c9f8d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -73,6 +73,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.FutureUtils;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -88,6 +89,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;
 
+import static 
org.apache.rocketmq.remoting.common.RemotingHelper.convertChannelFutureToCompletableFuture;
+
 public class NettyRemotingClient extends NettyRemotingAbstract implements 
RemotingClient {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
 
@@ -554,7 +557,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 updateChannelLastResponseTime(addr);
                 return response;
             } catch (RemotingSendRequestException e) {
-                LOGGER.warn("invokeSync: send request exception, so close the 
channel[{}]", channelRemoteAddr);
+                LOGGER.warn("invokeSync: send request exception, so close the 
channel[addr={}, id={}]", channelRemoteAddr, channel.id());
                 this.closeChannel(addr, channel);
                 throw e;
             } catch (RemotingTimeoutException e) {
@@ -832,45 +835,27 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                         return channelWrapper0;
                     });
                     if (channelWrapper != null && 
!channelWrapper.isWrapperOf(channel)) {
-                        if (nettyClientConfig.isEnableTransparentRetry()) {
-                            RemotingCommand retryRequest = 
RemotingCommand.createRequestCommand(request.getCode(), 
request.readCustomHeader());
-                            retryRequest.setBody(request.getBody());
-                            retryRequest.setExtFields(request.getExtFields());
-                            if (channelWrapper.isOK()) {
-                                long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
-                                stopwatch.stop();
-                                Channel retryChannel = 
channelWrapper.getChannel();
-                                if (retryChannel != null && channel != 
retryChannel) {
-                                    return super.invokeImpl(retryChannel, 
retryRequest, timeoutMillis - duration);
-                                }
-                            } else {
-                                CompletableFuture<ResponseFuture> future = new 
CompletableFuture<>();
-                                ChannelFuture channelFuture = 
channelWrapper.getChannelFuture();
-                                channelFuture.addListener(f -> {
-                                    long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
-                                    stopwatch.stop();
-                                    if (f.isSuccess()) {
-                                        Channel retryChannel0 = 
channelFuture.channel();
-                                        if (retryChannel0 != null && channel 
!= retryChannel0) {
-                                            super.invokeImpl(retryChannel0, 
retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
-                                                if (t != null) {
-                                                    
future.completeExceptionally(t);
-                                                } else {
-                                                    future.complete(v);
-                                                }
-                                            });
-                                        }
-                                    } else {
-                                        future.completeExceptionally(new 
RemotingConnectException(channelWrapper.channelAddress));
+                        RemotingCommand retryRequest = 
RemotingCommand.createRequestCommand(request.getCode(), 
request.readCustomHeader());
+                        retryRequest.setBody(request.getBody());
+                        retryRequest.setExtFields(request.getExtFields());
+                        CompletableFuture<Void> future = 
convertChannelFutureToCompletableFuture(channelWrapper.getChannelFuture());
+                        return future.thenCompose(v -> {
+                            long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
+                            stopwatch.stop();
+                            return 
super.invokeImpl(channelWrapper.getChannel(), retryRequest, timeoutMillis - 
duration)
+                                .thenCompose(r -> {
+                                    if (r.getResponseCommand().getCode() == 
ResponseCode.GO_AWAY) {
+                                        return 
FutureUtils.completeExceptionally(new 
RemotingSendRequestException(channelRemoteAddr,
+                                            new Throwable("Receive GO_AWAY 
twice in request from channelId=" + channel.id())));
                                     }
+                                    return 
CompletableFuture.completedFuture(r);
                                 });
-                                return future;
-                            }
-                        }
+                        });
                     } else {
                         LOGGER.warn("invokeImpl receive GO_AWAY, 
channelWrapper is null or channel is the same in wrapper, channelId={}", 
channel.id());
                     }
                 }
+                return FutureUtils.completeExceptionally(new 
RemotingSendRequestException(channelRemoteAddr, new Throwable("Receive GO_AWAY 
from channelId=" + channel.id())));
             }
             return CompletableFuture.completedFuture(responseFuture);
         }).whenComplete((v, t) -> {

Reply via email to