pnowojski commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r666302680
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -517,16 +520,16 @@ private void increaseBuffersInBacklog(BufferConsumer buffer) { } } - /** - * Gets the number of non-event buffers in this subpartition. - * - * <p><strong>Beware:</strong> This method should only be used in tests in non-concurrent access - * scenarios since it does not make any concurrency guarantees. - */ - @SuppressWarnings("FieldAccessNotGuarded") - @VisibleForTesting + /** Gets the number of non-event buffers in this subpartition. */ + @Override public int getBuffersInBacklog() { Review comment: nit: Rename to `getBuffersInBacklogUnsafe()` (previously it was a private method just made `@VisibleForTesting`) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException { checkState(!isReleased.get(), "Channel released."); checkPartitionRequestQueueInitialized(); + if (initialCredit == 0) { + unannouncedCredit.set(0); Review comment: Ok, can you at least add a comment explaining why this is set to `0` here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -312,6 +323,16 @@ BufferAndBacklog pollBuffer() { decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer()); } + // if we have an empty finished buffer and the exclusive credit is 0, we just return + // the empty buffer so that the downstream task can release the allocated credit for + // this empty buffer, this happens in two main scenarios currently: + // 1. all data of a buffer builder has been read and after that the buffer builder + // is finished + // 2. in approximate recovery mode, a partial record takes a whole buffer builder + if (buffersPerChannel == 0 && bufferConsumer.isFinished()) { + break; + } + Review comment: I can not seem to respond in the previous thread, so I need to start a new one. > Let's maybe focus on the 3rd case first and we assume that the exclusive credit is 0. > > 1. There are only one data buffer in the queue. > 2. Flush triggered. > 3. All data of the first buffer is committed but the buffer is still not finished. > 4. All data of the buffer is consumed by pollBuffer and the available credit becomes 0. > 5. The first buffer is finished, the second event is added and the data available notification is triggered. > 6. The upstream announces backlog to the downstream to request a credit. > 7. The upstream receives available credit and start to pollBuffer. > 8. Skip the first empty buffer and send the second event. > 9. The downstream receive the event but the event does not consume any credit. > > Do you mean we should change the current logic and release the floating buffer for event in some cases (including reduce the available credit by 1 at the upstream, currently the available credit is not decreased for event)? No, but I think we could send this regardless if any is credit available or not as we are doing right now. I think we are also already attaching information about the backlog to such event. One thing to add (unless we are not doing it already) would be to use this backlog information, to maybe release floating buffers if backlog dropped to 0? > If there are multiple empty buffers, should we just skip the first one or should we skip all? We could skip all of them, until we reach one of the three options: 1. non empty data buffer 2. event (check above) 3. last empty buffer, without any events after it - here we would indeed need to send that empty buffer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org