[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315835#comment-16315835
 ] 

ASF GitHub Bot commented on FLINK-7456:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r157760652
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
    @@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) {
                                MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
                                Buffer buffer = new Buffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
     
    -                           inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, -1);
    -
    -                           return true;
    -                   }
    -           }
    -           finally {
    -                   if (releaseNettyBuffer) {
    -                           bufferOrEvent.releaseBuffer();
    +                           inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                        }
    +           } finally {
    +                   bufferOrEvent.releaseBuffer();
                }
        }
     
    -   /**
    -    * This class would be replaced by CreditBasedClientHandler in the 
final,
    -    * so we only implement this method in CreditBasedClientHandler.
    -    */
    -   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
    -   }
    -
    -   private class AsyncErrorNotificationTask implements Runnable {
    -
    -           private final Throwable error;
    -
    -           public AsyncErrorNotificationTask(Throwable error) {
    -                   this.error = error;
    -           }
    -
    -           @Override
    -           public void run() {
    -                   notifyAllChannelsOfErrorAndClose(error);
    -           }
    -   }
    -
    -   /**
    -    * A buffer availability listener, which subscribes/unsubscribes the NIO
    -    * read event.
    -    *
    -    * <p>If no buffer is available, the channel read event will be 
unsubscribed
    -    * until one becomes available again.
    -    *
    -    * <p>After a buffer becomes available again, the buffer is handed over 
by
    -    * the thread calling {@link #notifyBufferAvailable(Buffer)} to the 
network I/O
    -    * thread, which then continues the processing of the staged buffer.
    -    */
    -   private class BufferListenerTask implements BufferListener, Runnable {
    -
    -           private final AtomicReference<Buffer> availableBuffer = new 
AtomicReference<Buffer>();
    -
    -           private NettyMessage.BufferResponse stagedBufferResponse;
    -
    -           private boolean waitForBuffer(BufferProvider bufferProvider, 
NettyMessage.BufferResponse bufferResponse) {
    -
    -                   stagedBufferResponse = bufferResponse;
    -
    -                   if (bufferProvider.addBufferListener(this)) {
    -                           if (ctx.channel().config().isAutoRead()) {
    -                                   
ctx.channel().config().setAutoRead(false);
    -                           }
    -
    -                           return true;
    -                   }
    -                   else {
    -                           stagedBufferResponse = null;
    -
    -                           return false;
    -                   }
    -           }
    -
    -           private boolean hasStagedBufferOrEvent() {
    -                   return stagedBufferResponse != null;
    -           }
    -
    -           public void notifyBufferDestroyed() {
    -                   // The buffer pool has been destroyed
    -                   stagedBufferResponse = null;
    -
    -                   if (stagedMessages.isEmpty()) {
    -                           ctx.channel().config().setAutoRead(true);
    -                           ctx.channel().read();
    -                   }
    -                   else {
    -                           
ctx.channel().eventLoop().execute(stagedMessagesHandler);
    -                   }
    +   private void writeAndFlushNextMessageIfPossible(Channel channel) {
    --- End diff --
    
    please re-add the comment here, too


> Implement Netty sender incoming pipeline for credit-based
> ---------------------------------------------------------
>
>                 Key: FLINK-7456
>                 URL: https://issues.apache.org/jira/browse/FLINK-7456
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to