This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ee7bc25ce [ISSUE #7815] Use createChannelAsync for async invoke rpc 
(#7816)
3ee7bc25ce is described below

commit 3ee7bc25cec0614ae36b0c935a37abb4ef589916
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Tue Feb 20 15:56:02 2024 +0800

    [ISSUE #7815] Use createChannelAsync for async invoke rpc (#7816)
    
    * async
---
 .../remoting/netty/NettyRemotingClient.java        | 227 ++++++++++++---------
 1 file changed, 126 insertions(+), 101 deletions(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 925c4f9cb2..836910f8fa 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -613,25 +613,33 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         }
     }
 
-    private Channel getAndCreateChannel(final String addr) throws 
InterruptedException {
+    private ChannelFuture getAndCreateChannelAsync(final String addr) throws 
InterruptedException {
         if (null == addr) {
-            return getAndCreateNameserverChannel();
+            return getAndCreateNameserverChannelAsync();
         }
 
         ChannelWrapper cw = this.channelTables.get(addr);
         if (cw != null && cw.isOK()) {
-            return cw.getChannel();
+            return cw.getChannelFuture();
         }
 
-        return this.createChannel(addr);
+        return this.createChannelAsync(addr);
+    }
+
+    private Channel getAndCreateChannel(final String addr) throws 
InterruptedException {
+        ChannelFuture channelFuture = getAndCreateChannelAsync(addr);
+        if (channelFuture == null) {
+            return null;
+        }
+        return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel();
     }
 
-    private Channel getAndCreateNameserverChannel() throws 
InterruptedException {
+    private ChannelFuture getAndCreateNameserverChannelAsync() throws 
InterruptedException {
         String addr = this.namesrvAddrChoosed.get();
         if (addr != null) {
             ChannelWrapper cw = this.channelTables.get(addr);
             if (cw != null && cw.isOK()) {
-                return cw.getChannel();
+                return cw.getChannelFuture();
             }
         }
 
@@ -642,25 +650,19 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 if (addr != null) {
                     ChannelWrapper cw = this.channelTables.get(addr);
                     if (cw != null && cw.isOK()) {
-                        return cw.getChannel();
+                        return cw.getChannelFuture();
                     }
                 }
 
                 if (addrList != null && !addrList.isEmpty()) {
-                    for (int i = 0; i < addrList.size(); i++) {
-                        int index = this.namesrvIndex.incrementAndGet();
-                        index = Math.abs(index);
-                        index = index % addrList.size();
-                        String newAddr = addrList.get(index);
-
-                        this.namesrvAddrChoosed.set(newAddr);
-                        LOGGER.info("new name server is chosen. OLD: {} , NEW: 
{}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
-                        Channel channelNew = this.createChannel(newAddr);
-                        if (channelNew != null) {
-                            return channelNew;
-                        }
-                    }
-                    throw new RemotingConnectException(addrList.toString());
+                    int index = this.namesrvIndex.incrementAndGet();
+                    index = Math.abs(index);
+                    index = index % addrList.size();
+                    String newAddr = addrList.get(index);
+
+                    this.namesrvAddrChoosed.set(newAddr);
+                    LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. 
namesrvIndex = {}", addr, newAddr, namesrvIndex);
+                    return this.createChannelAsync(newAddr);
                 }
             } catch (Exception e) {
                 LOGGER.error("getAndCreateNameserverChannel: create name 
server channel exception", e);
@@ -674,39 +676,23 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         return null;
     }
 
-    private Channel createChannel(final String addr) throws 
InterruptedException {
+    private ChannelFuture createChannelAsync(final String addr) throws 
InterruptedException {
         ChannelWrapper cw = this.channelTables.get(addr);
         if (cw != null && cw.isOK()) {
-            return cw.getChannel();
+            return cw.getChannelFuture();
         }
 
         if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
             try {
-                boolean createNewConnection;
                 cw = this.channelTables.get(addr);
                 if (cw != null) {
-
-                    if (cw.isOK()) {
-                        return cw.getChannel();
-                    } else if (!cw.getChannelFuture().isDone()) {
-                        createNewConnection = false;
+                    if (cw.isOK() || !cw.getChannelFuture().isDone()) {
+                        return cw.getChannelFuture();
                     } else {
                         this.channelTables.remove(addr);
-                        createNewConnection = true;
                     }
-                } else {
-                    createNewConnection = true;
-                }
-
-                if (createNewConnection) {
-                    String[] hostAndPort = getHostAndPort(addr);
-                    ChannelFuture channelFuture = fetchBootstrap(addr)
-                        .connect(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
-                    LOGGER.info("createChannel: begin to connect remote 
host[{}] asynchronously", addr);
-                    cw = new ChannelWrapper(addr, channelFuture);
-                    this.channelTables.put(addr, cw);
-                    this.channelWrapperTables.put(channelFuture.channel(), cw);
                 }
+                return createChannel(addr).getChannelFuture();
             } catch (Exception e) {
                 LOGGER.error("createChannel: create channel exception", e);
             } finally {
@@ -716,27 +702,18 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             LOGGER.warn("createChannel: try to lock channel table, but 
timeout, {}ms", LOCK_TIMEOUT_MILLIS);
         }
 
-        if (cw != null) {
-            return waitChannelFuture(addr, cw);
-        }
-
         return null;
     }
 
-    private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
-        ChannelFuture channelFuture = cw.getChannelFuture();
-        if 
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
 {
-            if (cw.isOK()) {
-                LOGGER.info("createChannel: connect remote host[{}] success, 
{}", addr, channelFuture.toString());
-                return cw.getChannel();
-            } else {
-                LOGGER.warn("createChannel: connect remote host[{}] failed, 
{}", addr, channelFuture.toString());
-            }
-        } else {
-            LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, 
{}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
-                channelFuture.toString());
-        }
-        return null;
+    private ChannelWrapper createChannel(String addr) {
+        String[] hostAndPort = getHostAndPort(addr);
+        ChannelFuture channelFuture = fetchBootstrap(addr)
+            .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+        LOGGER.info("createChannel: begin to connect remote host[{}] 
asynchronously", addr);
+        ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
+        this.channelTables.put(addr, cw);
+        this.channelWrapperTables.put(channelFuture.channel(), cw);
+        return cw;
     }
 
     @Override
@@ -744,38 +721,50 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException {
         long beginStartTime = System.currentTimeMillis();
-        final Channel channel = this.getAndCreateChannel(addr);
-        String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
-        if (channel != null && channel.isActive()) {
-            long costTime = System.currentTimeMillis() - beginStartTime;
-            if (timeoutMillis < costTime) {
-                throw new RemotingTooMuchRequestException("invokeAsync call 
the addr[" + channelRemoteAddr + "] timeout");
-            }
-            this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, 
new InvokeCallbackWrapper(invokeCallback, addr));
-        } else {
-            this.closeChannel(addr, channel);
-            throw new RemotingConnectException(addr);
+        final ChannelFuture channelFuture = 
this.getAndCreateChannelAsync(addr);
+        if (channelFuture == null) {
+            invokeCallback.operationFail(new RemotingConnectException(addr));
+            return;
         }
+        channelFuture.addListener(future -> {
+            if (future.isSuccess()) {
+                Channel channel = channelFuture.channel();
+                String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
+                if (channel != null && channel.isActive()) {
+                    long costTime = System.currentTimeMillis() - 
beginStartTime;
+                    if (timeoutMillis < costTime) {
+                        invokeCallback.operationFail(new 
RemotingTooMuchRequestException("invokeAsync call the addr[" + 
channelRemoteAddr + "] timeout"));
+                    }
+                    this.invokeAsyncImpl(channel, request, timeoutMillis - 
costTime, new InvokeCallbackWrapper(invokeCallback, addr));
+                } else {
+                    this.closeChannel(addr, channel);
+                    invokeCallback.operationFail(new 
RemotingConnectException(addr));
+                }
+            } else {
+                invokeCallback.operationFail(new 
RemotingConnectException(addr));
+            }
+        });
     }
 
     @Override
     public void invokeOneway(String addr, RemotingCommand request, long 
timeoutMillis) throws InterruptedException,
         RemotingConnectException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
-        final Channel channel = this.getAndCreateChannel(addr);
-        String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
-        if (channel != null && channel.isActive()) {
-            try {
-                doBeforeRpcHooks(channelRemoteAddr, request);
-                this.invokeOnewayImpl(channel, request, timeoutMillis);
-            } catch (RemotingSendRequestException e) {
-                LOGGER.warn("invokeOneway: send request exception, so close 
the channel[{}]", channelRemoteAddr);
-                this.closeChannel(addr, channel);
-                throw e;
-            }
-        } else {
-            this.closeChannel(addr, channel);
+        final ChannelFuture channelFuture = 
this.getAndCreateChannelAsync(addr);
+        if (channelFuture == null) {
             throw new RemotingConnectException(addr);
         }
+        channelFuture.addListener(future -> {
+            if (future.isSuccess()) {
+                Channel channel = channelFuture.channel();
+                String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
+                if (channel != null && channel.isActive()) {
+                    doBeforeRpcHooks(channelRemoteAddr, request);
+                    this.invokeOnewayImpl(channel, request, timeoutMillis);
+                } else {
+                    this.closeChannel(addr, channel);
+                }
+            }
+        });
     }
 
     @Override
@@ -783,17 +772,34 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         long timeoutMillis) {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
-            final Channel channel = this.getAndCreateChannel(addr);
-            if (channel != null && channel.isActive()) {
-                return invokeImpl(channel, request, 
timeoutMillis).whenComplete((v, t) -> {
-                    if (t == null) {
-                        updateChannelLastResponseTime(addr);
-                    }
-                }).thenApply(ResponseFuture::getResponseCommand);
-            } else {
-                this.closeChannel(addr, channel);
+            final ChannelFuture channelFuture = 
this.getAndCreateChannelAsync(addr);
+            if (channelFuture == null) {
                 future.completeExceptionally(new 
RemotingConnectException(addr));
+                return future;
             }
+            channelFuture.addListener(f -> {
+                if (f.isSuccess()) {
+                    Channel channel = channelFuture.channel();
+                    if (channel != null && channel.isActive()) {
+                        invokeImpl(channel, request, 
timeoutMillis).whenComplete((v, t) -> {
+                            if (t == null) {
+                                updateChannelLastResponseTime(addr);
+                            }
+                        
}).thenApply(ResponseFuture::getResponseCommand).whenComplete((v, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);
+                            } else {
+                                future.complete(v);
+                            }
+                        });
+                    } else {
+                        this.closeChannel(addr, channel);
+                        future.completeExceptionally(new 
RemotingConnectException(addr));
+                    }
+                } else {
+                    future.completeExceptionally(new 
RemotingConnectException(addr));
+                }
+            });
         } catch (Throwable t) {
             future.completeExceptionally(t);
         }
@@ -824,18 +830,37 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     });
                     if (channelWrapper != null) {
                         if (nettyClientConfig.isEnableTransparentRetry()) {
-                            long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
-                            stopwatch.stop();
                             RemotingCommand retryRequest = 
RemotingCommand.createRequestCommand(request.getCode(), 
request.readCustomHeader());
                             retryRequest.setBody(request.getBody());
-                            Channel retryChannel;
                             if (channelWrapper.isOK()) {
-                                retryChannel = channelWrapper.getChannel();
+                                long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
+                                stopwatch.stop();
+                                Channel retryChannel = 
channelWrapper.getChannel();
+                                if (retryChannel != null && channel != 
retryChannel) {
+                                    return super.invokeImpl(retryChannel, 
retryRequest, timeoutMillis - duration);
+                                }
                             } else {
-                                retryChannel = 
waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
-                            }
-                            if (retryChannel != null && channel != 
retryChannel) {
-                                return super.invokeImpl(retryChannel, 
retryRequest, timeoutMillis - duration);
+                                CompletableFuture<ResponseFuture> future = new 
CompletableFuture<>();
+                                ChannelFuture channelFuture = 
channelWrapper.getChannelFuture();
+                                channelFuture.addListener(f -> {
+                                    long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
+                                    stopwatch.stop();
+                                    if (f.isSuccess()) {
+                                        Channel retryChannel0 = 
channelFuture.channel();
+                                        if (retryChannel0 != null && channel 
!= retryChannel0) {
+                                            super.invokeImpl(retryChannel0, 
retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
+                                                if (t != null) {
+                                                    
future.completeExceptionally(t);
+                                                } else {
+                                                    future.complete(v);
+                                                }
+                                            });
+                                        }
+                                    } else {
+                                        future.completeExceptionally(new 
RemotingConnectException(channelWrapper.channelAddress));
+                                    }
+                                });
+                                return future;
                             }
                         }
                     }

Reply via email to