AHeise commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435737338
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -387,12 +378,28 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized boolean onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (numBarriersReceived > 0) {
+ resetReceivedBarriers();
+ notifyAbort(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
Review comment:
Can't we move this abort always to the caller side? Then the return
value of onChannelClosed is more like `shouldBeAborted` and would be symmetric
to `processCancellationBarrier`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -404,8 +400,14 @@ synchronized int getNumOpenChannels() {
return numOpenChannels;
}
+ @VisibleForTesting
synchronized long getCurrentCheckpointId() {
return currentReceivedCheckpointId;
}
+
+ @VisibleForTesting
+ boolean isCheckpointPending() {
Review comment:
A bit unrelated to your change, but shouldn't that also be synchronized?
Also is it necessary to move the method?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -117,17 +117,6 @@
threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels,
checkNotNull(channelStateWriter), this);
}
- @Override
- public void releaseBlocksAndResetBarriers() {
- if (isCheckpointPending()) {
- // make sure no additional data is persisted
- Arrays.fill(hasInflightBuffers, false);
- // the next barrier that comes must assume it is the
first
- numBarrierConsumed = 0;
- }
-
threadSafeUnaligner.resetReceivedBarriers(currentConsumedCheckpointId);
- }
-
Review comment:
If we change how this method is used, I'd consider it more than a
hotfix. Then I'd also like to see the reason for the removal in the commit
message (I don't understand it from this commit's perspective).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -409,5 +416,15 @@ synchronized long getCurrentCheckpointId() {
boolean isCheckpointPending() {
return numBarriersReceived > 0;
}
+
+ private void resetReceivedBarriers() {
+ Arrays.fill(storeNewBuffers, false);
+ numBarriersReceived = 0;
+ }
+
+ private void notifyAbort(CheckpointException exception) throws
IOException {
+ long currentCheckpointId = currentReceivedCheckpointId;
+ handler.executeInTaskThread(() ->
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+ }
Review comment:
I think adding a guard in `handler.executeInTaskThread(() ->
handler.notifyCheckpoint(barrier), "notifyCheckpoint");` against executing a
cancelled checkpoint is always a good idea. But I'm also seeing that it is not
enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]