Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141377486 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference<Throwable> channelError = new AtomicReference<>(); + + /** + * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared + * while data is still coming in for this channel. + */ + private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap(); + + private volatile ChannelHandlerContext ctx; + + // ------------------------------------------------------------------------ + // Input channel/receiver registration + // ------------------------------------------------------------------------ + + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); + + if (!inputChannels.containsKey(listener.getInputChannelId())) { + inputChannels.put(listener.getInputChannelId(), listener); + } + } + + void removeInputChannel(RemoteInputChannel listener) { + inputChannels.remove(listener.getInputChannelId()); + } + + void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { + ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); + } + } + + // ------------------------------------------------------------------------ + // Network events + // ------------------------------------------------------------------------ + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (this.ctx == null) { + this.ctx = ctx; + } + + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Unexpected close. In normal operation, the client closes the connection after all input + // channels have been removed. This indicates a problem with the remote task manager. + if (!inputChannels.isEmpty()) { + final SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + notifyAllChannelsOfErrorAndClose(new RemoteTransportException( + "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " + + "This might indicate that the remote task manager was lost.", + remoteAddr)); + } + + super.channelInactive(ctx); + } + + /** + * Called on exceptions in the client handler pipeline. + * + * <p> Remote exceptions are received as regular payload. + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + + if (cause instanceof TransportException) { + notifyAllChannelsOfErrorAndClose(cause); + } else { + final SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + final TransportException tex; + + // Improve on the connection reset by peer error message + if (cause instanceof IOException + && cause.getMessage().equals("Connection reset by peer")) { + + tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. " + + "This indicates that the remote task manager was lost.", remoteAddr, cause); + } else { + tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause); + } + + notifyAllChannelsOfErrorAndClose(tex); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + decodeMsg(msg); + } catch (Throwable t) { + notifyAllChannelsOfErrorAndClose(t); + } + } + + private void notifyAllChannelsOfErrorAndClose(Throwable cause) { + if (channelError.compareAndSet(null, cause)) { + try { + for (RemoteInputChannel inputChannel : inputChannels.values()) { + inputChannel.onError(cause); + } + } catch (Throwable t) { + // We can only swallow the Exception at this point. :( + LOG.warn("An Exception was thrown during error notification of a remote input channel.", t); + } finally { + inputChannels.clear(); + + if (ctx != null) { + ctx.close(); + } + } + } + } + + // ------------------------------------------------------------------------ + + /** + * Checks for an error and rethrows it if one was reported. + */ + private void checkError() throws IOException { + final Throwable t = channelError.get(); + + if (t != null) { + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("There has been an error in the channel.", t); + } + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); --- End diff -- It also keeps the same with current `PartitionRequestClientHandler`, then I can remove it.
---