zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r434711700
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -387,12 +378,28 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized boolean onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (numBarriersReceived > 0) {
+ resetReceivedBarriers();
+ notifyAbort(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ return true;
+ }
+ return false;
}
- synchronized void setCurrentReceivedCheckpointId(long
currentReceivedCheckpointId) {
- this.currentReceivedCheckpointId =
Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
+ synchronized boolean setCancelledCheckpointId(long
canceledCheckpointId) {
+ boolean shouldAbort = false;
+ if (canceledCheckpointId > currentReceivedCheckpointId)
{
+ currentReceivedCheckpointId =
canceledCheckpointId;
+ shouldAbort = true;
+
+ } else if (canceledCheckpointId ==
currentReceivedCheckpointId && isCheckpointPending()) {
+ resetReceivedBarriers();
+ shouldAbort = true;
+ }
+ return shouldAbort;
Review comment:
For cancellation checkpoint case, it is still valid because the the
`currentReceivedCheckpointId` from
`threadSafeUnaligner.getCurrentCheckpointId()` would be updated via
`#setCancelledCheckpointId()`.
For end of partition case, we did not couple it with specific checkpoint id
as explained in above comment. And we only abort the current pending checkpoint
if have, and allow the following new checkpoint happen afterwards.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]