lollipopjin commented on code in PR #7816:
URL: https://github.com/apache/rocketmq/pull/7816#discussion_r1477954085


##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java:
##########
@@ -715,84 +701,104 @@ private Channel createChannel(final String addr) throws 
InterruptedException {
             LOGGER.warn("createChannel: try to lock channel table, but 
timeout, {}ms", LOCK_TIMEOUT_MILLIS);
         }
 
-        if (cw != null) {
-            return waitChannelFuture(addr, cw);
-        }
-
         return null;
     }
 
-    private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
-        ChannelFuture channelFuture = cw.getChannelFuture();
-        if 
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
 {
-            if (cw.isOK()) {
-                LOGGER.info("createChannel: connect remote host[{}] success, 
{}", addr, channelFuture.toString());
-                return cw.getChannel();
-            } else {
-                LOGGER.warn("createChannel: connect remote host[{}] failed, 
{}", addr, channelFuture.toString());
-            }
-        } else {
-            LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, 
{}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
-                channelFuture.toString());
-        }
-        return null;
+    private ChannelWrapper createChannel(String addr) {
+        String[] hostAndPort = getHostAndPort(addr);
+        ChannelFuture channelFuture = fetchBootstrap(addr)
+            .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+        LOGGER.info("createChannel: begin to connect remote host[{}] 
asynchronously", addr);
+        ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
+        this.channelTables.put(addr, cw);
+        this.channelWrapperTables.put(channelFuture.channel(), cw);
+        return cw;
     }
 
     @Override
     public void invokeAsync(String addr, RemotingCommand request, long 
timeoutMillis, InvokeCallback invokeCallback)
         throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException {
         long beginStartTime = System.currentTimeMillis();
-        final Channel channel = this.getAndCreateChannel(addr);
-        String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
-        if (channel != null && channel.isActive()) {
-            long costTime = System.currentTimeMillis() - beginStartTime;
-            if (timeoutMillis < costTime) {
-                throw new RemotingTooMuchRequestException("invokeAsync call 
the addr[" + channelRemoteAddr + "] timeout");
-            }
-            this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, 
new InvokeCallbackWrapper(invokeCallback, addr));
-        } else {
-            this.closeChannel(addr, channel);
-            throw new RemotingConnectException(addr);
+        final ChannelFuture channelFuture = 
this.getAndCreateChannelAsync(addr);
+        if (channelFuture == null) {
+            invokeCallback.operationFail(new RemotingConnectException(addr));
+            return;
         }
+        channelFuture.addListener(future -> {
+            if (future.isSuccess()) {
+                Channel channel = channelFuture.channel();
+                String channelRemoteAddr = 
RemotingHelper.parseChannelRemoteAddr(channel);
+                if (channel != null && channel.isActive()) {
+                    long costTime = System.currentTimeMillis() - 
beginStartTime;
+                    if (timeoutMillis < costTime) {
+                        invokeCallback.operationFail(new 
RemotingTooMuchRequestException("invokeAsync call the addr[" + 
channelRemoteAddr + "] timeout"));

Review Comment:
   Please add real cost time to the Exception mesage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to