rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r544865125



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -451,11 +451,20 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
                                else {
                                        receivedBuffers.add(sequenceBuffer);
-                                       
channelStatePersister.maybePersist(buffer);
                                        if (dataType.requiresAnnouncement()) {
                                                firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
                                        }
                                }
+                               channelStatePersister
+                                       .checkForBarrier(sequenceBuffer.buffer)
+                                       .filter(id -> id > lastBarrierId)
+                                       .ifPresent(id -> {
+                                               // checkpoint was not yet 
started by task thread,
+                                               // so remember the numbers of 
buffers to spill for the time when it will be started
+                                               lastBarrierId = id;
+                                               lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
+                                       });
+                               channelStatePersister.maybePersist(buffer);

Review comment:
       > Side question, shouldn't those two fixes be separate commits?
   
   They were in the beginning, but it seemed to me just way too many commits :) 
So I tried to group some related changes. I can extract it if you prefer.
   
   > And what about test coverage for those changes?
   
   As [noted](https://github.com/apache/flink/pull/14057#discussion_r543678627) 
above
   >  I didn't add a unit test as after the other fixes in master (#14052) this 
change is not strictly necessary
   > (though I think it's still less error-prone to not update SQN 
unnecessarily).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to