Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r141886467
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
                return inputGate.getBufferProvider();
        }
     
    -   public void onBuffer(Buffer buffer, int sequenceNumber) {
    +   /**
    +    * Requests buffer from input channel directly for receiving network 
data.
    +    * It should always return an available buffer in credit-based mode.
    +    *
    +    * @return The available buffer.
    +    */
    +   public Buffer requestBuffer() {
    +           synchronized (availableBuffers) {
    +                   return availableBuffers.poll();
    +           }
    +   }
    +
    +   /**
    +    * Receives the backlog from producer's buffer response. If the number 
of available
    +    * buffers is less than the backlog length, it will request floating 
buffers from buffer
    +    * pool, and then notify unannounced credits to the producer.
    +    *
    +    * @param backlog The number of unsent buffers in the producer's sub 
partition.
    +    */
    +   private void onSenderBacklog(int backlog) {
    +           int numRequestedBuffers = 0;
    +
    +           synchronized (availableBuffers) {
    +                   // Important: the isReleased check should be inside the 
synchronized block.
    +                   if (!isReleased.get()) {
    +                           senderBacklog.set(backlog);
    +
    +                           while (senderBacklog.get() > 
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
    --- End diff --
    
    I was thinking about it a bit more and was talking to @StephanEwen about 
it, and we think, that it is actually fine to grab all resources we need at the 
moment. If there are not enough buffers at some point, the fair distribution 
will start when the buffers are recycled, i.e. via the callbacks of the new 
`BufferListener`. Since each channel always has its own exclusive buffers, we 
can guarantee that it always makes progress anyway! Additionally, we cannot 
really make a fair distribution from the start when receiving the first backlog 
(since we do not know all the other backlogs) unless we're waiting some time 
which we also do not want.
    
    I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep 
this in our heads and evaluate the current solution first to see whether we 
need this addition.
    
    Regarding the formula (which I took from the network FLIP):
    - from the FLIP with regards to the buffer distribution strategy: `Design 
rationale 2: Each channel always tries to maintain a credit of 
‘backlog_length + initialCredit’. That means that each channel tries to 
build the receive window for its current backlog as much as possible from the 
floating buffers, and use the exclusive ‘initialCredit’ buffers as a means 
to grow the window.` That way we always have some buffers available immediately 
on the receiver side so the sender can continue sending new buffers immediately 
(as long as there are buffers available on the receiver) and we do not have to 
wait for the exclusive buffers to come back.
      - Note that this would have to be changed in the various checks for 
`availableBuffers.size() >= senderBacklog.get()`, e.g. in 
`RemoteInputChannel#notifyBufferAvailable()`.
      - Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case 
our exclusive buffers are in use and we requested `backlog_length + 
initialCredit - currentCredit` *floating* buffers in order not to stack up 
`2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test)
    - what do you mean with `backlog-currentCredit` not being very accurate? We 
guarantee that there are no more than `currentCredit` buffers on the wire (some 
alraedy in the channel, some only announced) and, at the time the buffer was 
sent, `backlog` additional buffers were queued so in order to send them, we 
always need `backlog-currentCredit` irrespective of how much credit is 
announced vs. being on the wire. Or am I not seeing something here?


---

Reply via email to