[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295307#comment-16295307
 ] 

ASF GitHub Bot commented on FLINK-7468:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157548033
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
    @@ -52,6 +54,10 @@
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
     
    +   /** The number of non-event buffers currently in this subpartition */
    +   @GuardedBy("buffers")
    +   private volatile int buffersInBacklog;
    --- End diff --
    
    I shortly thought about relying on `buffers.size()` here to reduce 
complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may 
show some race conditions then without synchronisation. However, if we picked 
up the idea again of returning the backlog size with the buffer itself (which 
is retrieved under the lock), i.e. similar to `BufferAndAvailability` being 
returned by the `SequenceNumberingViewReader`, this would work and we would not 
need the `volatile` here. Since you split the implementations into 
`PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a 
viable approach again.
    What do you think? What would you prefer?


> Implement sender backlog logic for credit-based
> -----------------------------------------------
>
>                 Key: FLINK-7468
>                 URL: https://issues.apache.org/jira/browse/FLINK-7468
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to