pnowojski commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r665342075



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -315,6 +326,17 @@ BufferAndBacklog pollBuffer() {
                 if (buffer.readableBytes() > 0) {
                     break;
                 }
+
+                // if we have an empty finished buffer and the exclusive 
credit is 0, we just return
+                // the empty buffer so that the downstream task can release 
the allocated credit for
+                // this empty buffer, this happens in two main scenarios 
currently:
+                // 1. all data of a buffer builder has been read and after 
that the buffer builder
+                // is finished
+                // 2. in approximate recovery mode, a partial record takes a 
whole buffer builder
+                if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+                    break;
+                }
+

Review comment:
       If there are two buffers, first is empty the second one can be:
   1. non empty buffer
   2. empty buffer
   3. event
   
   In all cases, instead of sending this first empty buffer with the backlog 
information, as far as I understand, it should be possible to just send any of 
the above instead with the updated backlog information. From the downstream 
node perspective, it should make no difference if we hide this empty buffer. If 
this is 2nd or 3rd case, we should be able to release the floating buffer 
regardless of that, shouldn't we?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to