AHeise commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r437215398
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
+ return false;
+ }
+
+ resetPendingCheckpoint();
+ currentReceivedCheckpointId = cancelledId;
+ return true;
+ }
+
+ synchronized void tryAbortPendingCheckpoint(long checkpointId,
CheckpointException exception) throws IOException {
+ if (checkpointId > currentReceivedCheckpointId &&
resetPendingCheckpoint()) {
+
handler.notifyAbort(currentReceivedCheckpointId, exception);
+ }
}
- synchronized void setCurrentReceivedCheckpointId(long
currentReceivedCheckpointId) {
- this.currentReceivedCheckpointId =
Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
+ private boolean resetPendingCheckpoint() {
+ if (numBarriersReceived == 0) {
Review comment:
`!isCheckpointPending()`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -404,8 +400,14 @@ synchronized int getNumOpenChannels() {
return numOpenChannels;
}
+ @VisibleForTesting
synchronized long getCurrentCheckpointId() {
return currentReceivedCheckpointId;
}
+
+ @VisibleForTesting
+ boolean isCheckpointPending() {
Review comment:
I'd prefer to use this method in main code instead of spreading the
knowledge in many places. If I read `isCheckpointPending`, I understand it much
quicker than numBarriersReceived > 0.
But it's more a personal taste, so up to you.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
+ return false;
+ }
+
+ resetPendingCheckpoint();
+ currentReceivedCheckpointId = cancelledId;
+ return true;
Review comment:
Would it make sense to return the value of `resetPendingCheckpoint`
instead of always `true`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
Review comment:
`&& numBarriersReceived == 0` -> `!isCheckpointPending()`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]