This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d73b601382 [ISSUE #7330] Fix channel connect issue for goaway (#7467)
d73b601382 is described below

commit d73b6013825db9124e39a37db67094e34b9c3d88
Author: Zhouxiang Zhan <zhouxz...@apache.org>
AuthorDate: Mon Oct 16 19:06:40 2023 +0800

    [ISSUE #7330] Fix channel connect issue for goaway (#7467)
    
    * add waitChannelFuture for goaway
    
    * add body for retry channel
---
 .../remoting/netty/NettyRemotingClient.java        | 41 +++++++++++++++-------
 1 file changed, 28 insertions(+), 13 deletions(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 4bc51bd833..340daee67e 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -716,20 +716,25 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         }
 
         if (cw != null) {
-            ChannelFuture channelFuture = cw.getChannelFuture();
-            if 
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
 {
-                if (cw.isOK()) {
-                    LOGGER.info("createChannel: connect remote host[{}] 
success, {}", addr, channelFuture.toString());
-                    return cw.getChannel();
-                } else {
-                    LOGGER.warn("createChannel: connect remote host[" + addr + 
"] failed, " + channelFuture.toString());
-                }
+            return waitChannelFuture(addr, cw);
+        }
+
+        return null;
+    }
+
+    private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
+        ChannelFuture channelFuture = cw.getChannelFuture();
+        if 
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
 {
+            if (cw.isOK()) {
+                LOGGER.info("createChannel: connect remote host[{}] success, 
{}", addr, channelFuture.toString());
+                return cw.getChannel();
             } else {
-                LOGGER.warn("createChannel: connect remote host[{}] timeout 
{}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
-                    channelFuture.toString());
+                LOGGER.warn("createChannel: connect remote host[{}] failed, 
{}", addr, channelFuture.toString());
             }
+        } else {
+            LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, 
{}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
+                channelFuture.toString());
         }
-
         return null;
     }
 
@@ -818,8 +823,14 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                             long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
                             stopwatch.stop();
                             RemotingCommand retryRequest = 
RemotingCommand.createRequestCommand(request.getCode(), 
request.readCustomHeader());
-                            Channel retryChannel = channelWrapper.getChannel();
-                            if (channel != retryChannel) {
+                            retryRequest.setBody(request.getBody());
+                            Channel retryChannel;
+                            if (channelWrapper.isOK()) {
+                                retryChannel = channelWrapper.getChannel();
+                            } else {
+                                retryChannel = 
waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
+                            }
+                            if (retryChannel != null && channel != 
retryChannel) {
                                 return super.invokeImpl(retryChannel, 
retryRequest, timeoutMillis - duration);
                             }
                         }
@@ -994,6 +1005,10 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             this.lastResponseTime = System.currentTimeMillis();
         }
 
+        public String getChannelAddress() {
+            return channelAddress;
+        }
+
         public boolean reconnect() {
             if (lock.writeLock().tryLock()) {
                 try {

Reply via email to