This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3ee7bc25ce [ISSUE #7815] Use createChannelAsync for async invoke rpc (#7816) 3ee7bc25ce is described below commit 3ee7bc25cec0614ae36b0c935a37abb4ef589916 Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Tue Feb 20 15:56:02 2024 +0800 [ISSUE #7815] Use createChannelAsync for async invoke rpc (#7816) * async --- .../remoting/netty/NettyRemotingClient.java | 227 ++++++++++++--------- 1 file changed, 126 insertions(+), 101 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 925c4f9cb2..836910f8fa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -613,25 +613,33 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - private Channel getAndCreateChannel(final String addr) throws InterruptedException { + private ChannelFuture getAndCreateChannelAsync(final String addr) throws InterruptedException { if (null == addr) { - return getAndCreateNameserverChannel(); + return getAndCreateNameserverChannelAsync(); } ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannelFuture(); } - return this.createChannel(addr); + return this.createChannelAsync(addr); + } + + private Channel getAndCreateChannel(final String addr) throws InterruptedException { + ChannelFuture channelFuture = getAndCreateChannelAsync(addr); + if (channelFuture == null) { + return null; + } + return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel(); } - private Channel getAndCreateNameserverChannel() throws InterruptedException { + private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannelFuture(); } } @@ -642,25 +650,19 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannelFuture(); } } if (addrList != null && !addrList.isEmpty()) { - for (int i = 0; i < addrList.size(); i++) { - int index = this.namesrvIndex.incrementAndGet(); - index = Math.abs(index); - index = index % addrList.size(); - String newAddr = addrList.get(index); - - this.namesrvAddrChoosed.set(newAddr); - LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); - Channel channelNew = this.createChannel(newAddr); - if (channelNew != null) { - return channelNew; - } - } - throw new RemotingConnectException(addrList.toString()); + int index = this.namesrvIndex.incrementAndGet(); + index = Math.abs(index); + index = index % addrList.size(); + String newAddr = addrList.get(index); + + this.namesrvAddrChoosed.set(newAddr); + LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); + return this.createChannelAsync(newAddr); } } catch (Exception e) { LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e); @@ -674,39 +676,23 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return null; } - private Channel createChannel(final String addr) throws InterruptedException { + private ChannelFuture createChannelAsync(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannelFuture(); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { - - if (cw.isOK()) { - return cw.getChannel(); - } else if (!cw.getChannelFuture().isDone()) { - createNewConnection = false; + if (cw.isOK() || !cw.getChannelFuture().isDone()) { + return cw.getChannelFuture(); } else { this.channelTables.remove(addr); - createNewConnection = true; } - } else { - createNewConnection = true; - } - - if (createNewConnection) { - String[] hostAndPort = getHostAndPort(addr); - ChannelFuture channelFuture = fetchBootstrap(addr) - .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1])); - LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); - cw = new ChannelWrapper(addr, channelFuture); - this.channelTables.put(addr, cw); - this.channelWrapperTables.put(channelFuture.channel(), cw); } + return createChannel(addr).getChannelFuture(); } catch (Exception e) { LOGGER.error("createChannel: create channel exception", e); } finally { @@ -716,27 +702,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } - if (cw != null) { - return waitChannelFuture(addr, cw); - } - return null; } - private Channel waitChannelFuture(String addr, ChannelWrapper cw) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { - if (cw.isOK()) { - LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString()); - } - } else { - LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), - channelFuture.toString()); - } - return null; + private ChannelWrapper createChannel(String addr) { + String[] hostAndPort = getHostAndPort(addr); + ChannelFuture channelFuture = fetchBootstrap(addr) + .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); + ChannelWrapper cw = new ChannelWrapper(addr, channelFuture); + this.channelTables.put(addr, cw); + this.channelWrapperTables.put(channelFuture.channel(), cw); + return cw; } @Override @@ -744,38 +721,50 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); - final Channel channel = this.getAndCreateChannel(addr); - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - if (channel != null && channel.isActive()) { - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeoutMillis < costTime) { - throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"); - } - this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); - } else { - this.closeChannel(addr, channel); - throw new RemotingConnectException(addr); + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + if (channelFuture == null) { + invokeCallback.operationFail(new RemotingConnectException(addr)); + return; } + channelFuture.addListener(future -> { + if (future.isSuccess()) { + Channel channel = channelFuture.channel(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + if (channel != null && channel.isActive()) { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeoutMillis < costTime) { + invokeCallback.operationFail(new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout")); + } + this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); + } else { + this.closeChannel(addr, channel); + invokeCallback.operationFail(new RemotingConnectException(addr)); + } + } else { + invokeCallback.operationFail(new RemotingConnectException(addr)); + } + }); } @Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { - final Channel channel = this.getAndCreateChannel(addr); - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - if (channel != null && channel.isActive()) { - try { - doBeforeRpcHooks(channelRemoteAddr, request); - this.invokeOnewayImpl(channel, request, timeoutMillis); - } catch (RemotingSendRequestException e) { - LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", channelRemoteAddr); - this.closeChannel(addr, channel); - throw e; - } - } else { - this.closeChannel(addr, channel); + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + if (channelFuture == null) { throw new RemotingConnectException(addr); } + channelFuture.addListener(future -> { + if (future.isSuccess()) { + Channel channel = channelFuture.channel(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + if (channel != null && channel.isActive()) { + doBeforeRpcHooks(channelRemoteAddr, request); + this.invokeOnewayImpl(channel, request, timeoutMillis); + } else { + this.closeChannel(addr, channel); + } + } + }); } @Override @@ -783,17 +772,34 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti long timeoutMillis) { CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { - final Channel channel = this.getAndCreateChannel(addr); - if (channel != null && channel.isActive()) { - return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> { - if (t == null) { - updateChannelLastResponseTime(addr); - } - }).thenApply(ResponseFuture::getResponseCommand); - } else { - this.closeChannel(addr, channel); + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + if (channelFuture == null) { future.completeExceptionally(new RemotingConnectException(addr)); + return future; } + channelFuture.addListener(f -> { + if (f.isSuccess()) { + Channel channel = channelFuture.channel(); + if (channel != null && channel.isActive()) { + invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> { + if (t == null) { + updateChannelLastResponseTime(addr); + } + }).thenApply(ResponseFuture::getResponseCommand).whenComplete((v, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(v); + } + }); + } else { + this.closeChannel(addr, channel); + future.completeExceptionally(new RemotingConnectException(addr)); + } + } else { + future.completeExceptionally(new RemotingConnectException(addr)); + } + }); } catch (Throwable t) { future.completeExceptionally(t); } @@ -824,18 +830,37 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti }); if (channelWrapper != null) { if (nettyClientConfig.isEnableTransparentRetry()) { - long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); - stopwatch.stop(); RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); retryRequest.setBody(request.getBody()); - Channel retryChannel; if (channelWrapper.isOK()) { - retryChannel = channelWrapper.getChannel(); + long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); + stopwatch.stop(); + Channel retryChannel = channelWrapper.getChannel(); + if (retryChannel != null && channel != retryChannel) { + return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); + } } else { - retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper); - } - if (retryChannel != null && channel != retryChannel) { - return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); + CompletableFuture<ResponseFuture> future = new CompletableFuture<>(); + ChannelFuture channelFuture = channelWrapper.getChannelFuture(); + channelFuture.addListener(f -> { + long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); + stopwatch.stop(); + if (f.isSuccess()) { + Channel retryChannel0 = channelFuture.channel(); + if (retryChannel0 != null && channel != retryChannel0) { + super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(v); + } + }); + } + } else { + future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress)); + } + }); + return future; } } }