lollipopjin commented on code in PR #7816: URL: https://github.com/apache/rocketmq/pull/7816#discussion_r1477954085
########## remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java: ########## @@ -715,84 +701,104 @@ private Channel createChannel(final String addr) throws InterruptedException { LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } - if (cw != null) { - return waitChannelFuture(addr, cw); - } - return null; } - private Channel waitChannelFuture(String addr, ChannelWrapper cw) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { - if (cw.isOK()) { - LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString()); - } - } else { - LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), - channelFuture.toString()); - } - return null; + private ChannelWrapper createChannel(String addr) { + String[] hostAndPort = getHostAndPort(addr); + ChannelFuture channelFuture = fetchBootstrap(addr) + .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); + ChannelWrapper cw = new ChannelWrapper(addr, channelFuture); + this.channelTables.put(addr, cw); + this.channelWrapperTables.put(channelFuture.channel(), cw); + return cw; } @Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); - final Channel channel = this.getAndCreateChannel(addr); - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - if (channel != null && channel.isActive()) { - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeoutMillis < costTime) { - throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"); - } - this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); - } else { - this.closeChannel(addr, channel); - throw new RemotingConnectException(addr); + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + if (channelFuture == null) { + invokeCallback.operationFail(new RemotingConnectException(addr)); + return; } + channelFuture.addListener(future -> { + if (future.isSuccess()) { + Channel channel = channelFuture.channel(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + if (channel != null && channel.isActive()) { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeoutMillis < costTime) { + invokeCallback.operationFail(new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout")); Review Comment: Please add real cost time to the Exception mesage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org