Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r157760652
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
    @@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) {
                                MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
                                Buffer buffer = new Buffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
     
    -                           inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, -1);
    -
    -                           return true;
    -                   }
    -           }
    -           finally {
    -                   if (releaseNettyBuffer) {
    -                           bufferOrEvent.releaseBuffer();
    +                           inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                        }
    +           } finally {
    +                   bufferOrEvent.releaseBuffer();
                }
        }
     
    -   /**
    -    * This class would be replaced by CreditBasedClientHandler in the 
final,
    -    * so we only implement this method in CreditBasedClientHandler.
    -    */
    -   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
    -   }
    -
    -   private class AsyncErrorNotificationTask implements Runnable {
    -
    -           private final Throwable error;
    -
    -           public AsyncErrorNotificationTask(Throwable error) {
    -                   this.error = error;
    -           }
    -
    -           @Override
    -           public void run() {
    -                   notifyAllChannelsOfErrorAndClose(error);
    -           }
    -   }
    -
    -   /**
    -    * A buffer availability listener, which subscribes/unsubscribes the NIO
    -    * read event.
    -    *
    -    * <p>If no buffer is available, the channel read event will be 
unsubscribed
    -    * until one becomes available again.
    -    *
    -    * <p>After a buffer becomes available again, the buffer is handed over 
by
    -    * the thread calling {@link #notifyBufferAvailable(Buffer)} to the 
network I/O
    -    * thread, which then continues the processing of the staged buffer.
    -    */
    -   private class BufferListenerTask implements BufferListener, Runnable {
    -
    -           private final AtomicReference<Buffer> availableBuffer = new 
AtomicReference<Buffer>();
    -
    -           private NettyMessage.BufferResponse stagedBufferResponse;
    -
    -           private boolean waitForBuffer(BufferProvider bufferProvider, 
NettyMessage.BufferResponse bufferResponse) {
    -
    -                   stagedBufferResponse = bufferResponse;
    -
    -                   if (bufferProvider.addBufferListener(this)) {
    -                           if (ctx.channel().config().isAutoRead()) {
    -                                   
ctx.channel().config().setAutoRead(false);
    -                           }
    -
    -                           return true;
    -                   }
    -                   else {
    -                           stagedBufferResponse = null;
    -
    -                           return false;
    -                   }
    -           }
    -
    -           private boolean hasStagedBufferOrEvent() {
    -                   return stagedBufferResponse != null;
    -           }
    -
    -           public void notifyBufferDestroyed() {
    -                   // The buffer pool has been destroyed
    -                   stagedBufferResponse = null;
    -
    -                   if (stagedMessages.isEmpty()) {
    -                           ctx.channel().config().setAutoRead(true);
    -                           ctx.channel().read();
    -                   }
    -                   else {
    -                           
ctx.channel().eventLoop().execute(stagedMessagesHandler);
    -                   }
    +   private void writeAndFlushNextMessageIfPossible(Channel channel) {
    --- End diff --
    
    please re-add the comment here, too


---

Reply via email to