[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182835#comment-16182835 ]
ASF GitHub Bot commented on FLINK-7406: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference<Throwable> channelError = new AtomicReference<>(); + + /** + * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared + * while data is still coming in for this channel. + */ + private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap(); + + private volatile ChannelHandlerContext ctx; + + // ------------------------------------------------------------------------ + // Input channel/receiver registration + // ------------------------------------------------------------------------ + + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); + + if (!inputChannels.containsKey(listener.getInputChannelId())) { + inputChannels.put(listener.getInputChannelId(), listener); + } + } + + void removeInputChannel(RemoteInputChannel listener) { + inputChannels.remove(listener.getInputChannelId()); + } + + void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { + ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); + } + } + + // ------------------------------------------------------------------------ + // Network events + // ------------------------------------------------------------------------ + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (this.ctx == null) { + this.ctx = ctx; + } + + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Unexpected close. In normal operation, the client closes the connection after all input + // channels have been removed. This indicates a problem with the remote task manager. + if (!inputChannels.isEmpty()) { + final SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + notifyAllChannelsOfErrorAndClose(new RemoteTransportException( + "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " + + "This might indicate that the remote task manager was lost.", + remoteAddr)); + } + + super.channelInactive(ctx); + } + + /** + * Called on exceptions in the client handler pipeline. + * + * <p> Remote exceptions are received as regular payload. + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + + if (cause instanceof TransportException) { + notifyAllChannelsOfErrorAndClose(cause); + } else { + final SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + final TransportException tex; + + // Improve on the connection reset by peer error message + if (cause instanceof IOException + && cause.getMessage().equals("Connection reset by peer")) { + + tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. " + + "This indicates that the remote task manager was lost.", remoteAddr, cause); + } else { + tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause); + } + + notifyAllChannelsOfErrorAndClose(tex); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + decodeMsg(msg); + } catch (Throwable t) { + notifyAllChannelsOfErrorAndClose(t); + } + } + + private void notifyAllChannelsOfErrorAndClose(Throwable cause) { + if (channelError.compareAndSet(null, cause)) { + try { + for (RemoteInputChannel inputChannel : inputChannels.values()) { + inputChannel.onError(cause); + } + } catch (Throwable t) { + // We can only swallow the Exception at this point. :( + LOG.warn("An Exception was thrown during error notification of a remote input channel.", t); + } finally { + inputChannels.clear(); + + if (ctx != null) { + ctx.close(); + } + } + } + } + + // ------------------------------------------------------------------------ + + /** + * Checks for an error and rethrows it if one was reported. + */ + private void checkError() throws IOException { + final Throwable t = channelError.get(); + + if (t != null) { + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("There has been an error in the channel.", t); + } + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + } + + private void decodeMsg(Object msg) throws Throwable { + final Class<?> msgClazz = msg.getClass(); + + // ---- Buffer -------------------------------------------------------- + if (msgClazz == NettyMessage.BufferResponse.class) { + NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; + + RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); + if (inputChannel == null) { + bufferOrEvent.releaseBuffer(); + + cancelRequestFor(bufferOrEvent.receiverId); + + return; + } + + decodeBufferOrEvent(inputChannel, bufferOrEvent); + + } else if (msgClazz == NettyMessage.ErrorResponse.class) { + // ---- Error --------------------------------------------------------- + NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg; + + SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + if (error.isFatalError()) { + notifyAllChannelsOfErrorAndClose(new RemoteTransportException( + "Fatal error at remote task manager '" + remoteAddr + "'.", + remoteAddr, + error.cause)); + } else { + RemoteInputChannel inputChannel = inputChannels.get(error.receiverId); + + if (inputChannel != null) { + if (error.cause.getClass() == PartitionNotFoundException.class) { + inputChannel.onFailedPartitionRequest(); + } else { + inputChannel.onError(new RemoteTransportException( + "Error at remote task manager '" + remoteAddr + "'.", + remoteAddr, + error.cause)); + } + } + } + } else { + throw new IllegalStateException("Received unknown message from producer: " + msg.getClass()); + } + } + + private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable { + try { + if (bufferOrEvent.isBuffer()) { + // ---- Buffer ------------------------------------------------ + + // Early return for empty buffers. Otherwise Netty's readBytes() throws an + // IndexOutOfBoundsException. + if (bufferOrEvent.getSize() == 0) { + inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber); + return; + } + + Buffer buffer = inputChannel.requestBuffer(); + if (buffer != null) { + buffer.setSize(bufferOrEvent.getSize()); + bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer()); + + inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber); + inputChannel.onSenderBacklog(bufferOrEvent.backlog); + } else if (inputChannel.isReleased()) { + cancelRequestFor(bufferOrEvent.receiverId); + } else { + throw new IOException("There should always have available buffer for credit-based."); + } + } else { + // ---- Event ------------------------------------------------- + // TODO We can just keep the serialized data in the Netty buffer and release it later at the reader + byte[] byteArray = new byte[bufferOrEvent.getSize()]; + bufferOrEvent.getNettyBuffer().readBytes(byteArray); + + MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); + Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false); + + inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber); + inputChannel.onSenderBacklog(bufferOrEvent.backlog); --- End diff -- yes, I guess, you can use an invalid backlog (`-1`?) to differentiate? Don't put too much effort into it since `PartitionRequestClientHandler` will be removed at some point. > Implement Netty receiver incoming pipeline for credit-based > ----------------------------------------------------------- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)