pnowojski commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r666302680



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -517,16 +520,16 @@ private void increaseBuffersInBacklog(BufferConsumer 
buffer) {
         }
     }
 
-    /**
-     * Gets the number of non-event buffers in this subpartition.
-     *
-     * <p><strong>Beware:</strong> This method should only be used in tests in 
non-concurrent access
-     * scenarios since it does not make any concurrency guarantees.
-     */
-    @SuppressWarnings("FieldAccessNotGuarded")
-    @VisibleForTesting
+    /** Gets the number of non-event buffers in this subpartition. */
+    @Override
     public int getBuffersInBacklog() {

Review comment:
       nit: Rename to `getBuffersInBacklogUnsafe()` (previously it was a 
private method just made `@VisibleForTesting`)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
         checkState(!isReleased.get(), "Channel released.");
         checkPartitionRequestQueueInitialized();
 
+        if (initialCredit == 0) {
+            unannouncedCredit.set(0);

Review comment:
       Ok, can you at least add a comment explaining why this is set to `0` 
here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -312,6 +323,16 @@ BufferAndBacklog pollBuffer() {
                     decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
                 }
 
+                // if we have an empty finished buffer and the exclusive 
credit is 0, we just return
+                // the empty buffer so that the downstream task can release 
the allocated credit for
+                // this empty buffer, this happens in two main scenarios 
currently:
+                // 1. all data of a buffer builder has been read and after 
that the buffer builder
+                // is finished
+                // 2. in approximate recovery mode, a partial record takes a 
whole buffer builder
+                if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+                    break;
+                }
+

Review comment:
       I can not seem to respond in the previous thread, so I need to start a 
new one.
   
   > Let's maybe focus on the 3rd case first and we assume that the exclusive 
credit is 0.
   >
   > 1. There are only one data buffer in the queue.
   > 2. Flush triggered.
   > 3. All data of the first buffer is committed but the buffer is still not 
finished.
   > 4. All data of the buffer is consumed by pollBuffer and the available 
credit becomes 0.
   > 5. The first buffer is finished, the second event is added and the data 
available notification is triggered.
   > 6. The upstream announces backlog to the downstream to request a credit.
   > 7. The upstream receives available credit and start to pollBuffer.
   > 8. Skip the first empty buffer and send the second event.
   > 9. The downstream receive the event but the event does not consume any 
credit.
   >
   > Do you mean we should change the current logic and release the floating 
buffer for event in some cases (including reduce the available credit by 1 at 
the upstream, currently the available credit is not decreased for event)?
   
   No, but I think we could send this regardless if any is credit available or 
not as we are doing right now. I think we are also already attaching 
information about the backlog to such event. One thing to add (unless we are 
not doing it already) would be to use this backlog information, to maybe 
release floating buffers if backlog dropped to 0?
   
   > If there are multiple empty buffers, should we just skip the first one or 
should we skip all?
   
   We could skip all of them, until we reach one of the three options:
   1. non empty data buffer
   2. event (check above)
   3. last empty buffer, without any events after it - here we would indeed 
need to send that empty buffer
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to