AHeise commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435737338



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -387,12 +378,28 @@ synchronized void resetReceivedBarriers(long 
checkpointId) {
                        return allBarriersReceivedFuture;
                }
 
-               synchronized void onChannelClosed() {
+               synchronized boolean onChannelClosed() throws IOException {
                        numOpenChannels--;
+
+                       if (numBarriersReceived > 0) {
+                               resetReceivedBarriers();
+                               notifyAbort(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));

Review comment:
       Can't we move this abort always to the caller side? Then the return 
value of onChannelClosed is more like `shouldBeAborted` and would be symmetric 
to `processCancellationBarrier`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -404,8 +400,14 @@ synchronized int getNumOpenChannels() {
                        return numOpenChannels;
                }
 
+               @VisibleForTesting
                synchronized long getCurrentCheckpointId() {
                        return currentReceivedCheckpointId;
                }
+
+               @VisibleForTesting
+               boolean isCheckpointPending() {

Review comment:
       A bit unrelated to your change, but shouldn't that also be synchronized?
   Also is it necessary to move the method?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -117,17 +117,6 @@
                threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, 
checkNotNull(channelStateWriter), this);
        }
 
-       @Override
-       public void releaseBlocksAndResetBarriers() {
-               if (isCheckpointPending()) {
-                       // make sure no additional data is persisted
-                       Arrays.fill(hasInflightBuffers, false);
-                       // the next barrier that comes must assume it is the 
first
-                       numBarrierConsumed = 0;
-               }
-               
threadSafeUnaligner.resetReceivedBarriers(currentConsumedCheckpointId);
-       }
-

Review comment:
       If we change how this method is used, I'd consider it more than a 
hotfix. Then I'd also like to see the reason for the removal in the commit 
message (I don't understand it from this commit's perspective).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -409,5 +416,15 @@ synchronized long getCurrentCheckpointId() {
                boolean isCheckpointPending() {
                        return numBarriersReceived > 0;
                }
+
+               private void resetReceivedBarriers() {
+                       Arrays.fill(storeNewBuffers, false);
+                       numBarriersReceived = 0;
+               }
+
+               private void notifyAbort(CheckpointException exception) throws 
IOException {
+                       long currentCheckpointId = currentReceivedCheckpointId;
+                       handler.executeInTaskThread(() -> 
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+               }

Review comment:
       I think adding a guard in `handler.executeInTaskThread(() -> 
handler.notifyCheckpoint(barrier), "notifyCheckpoint");` against executing a 
cancelled checkpoint is always a good idea. But I'm also seeing that it is not 
enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to