zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r437226642
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
Review comment:
My intention to not rely on `isCheckpointPending()` for new methods,
also tried to get ride of it in previous core codes and make
`isCheckpointPending()` only for test purpose.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
+ return false;
+ }
+
+ resetPendingCheckpoint();
+ currentReceivedCheckpointId = cancelledId;
+ return true;
+ }
+
+ synchronized void tryAbortPendingCheckpoint(long checkpointId,
CheckpointException exception) throws IOException {
+ if (checkpointId > currentReceivedCheckpointId &&
resetPendingCheckpoint()) {
+
handler.notifyAbort(currentReceivedCheckpointId, exception);
+ }
}
- synchronized void setCurrentReceivedCheckpointId(long
currentReceivedCheckpointId) {
- this.currentReceivedCheckpointId =
Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
+ private boolean resetPendingCheckpoint() {
+ if (numBarriersReceived == 0) {
Review comment:
ditto
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
+ return false;
+ }
+
+ resetPendingCheckpoint();
+ currentReceivedCheckpointId = cancelledId;
+ return true;
Review comment:
I guess not, because if `numBarriersReceived == 0` the
`resetPendingCheckpoint` will return false, but actually we also need to call
abort as long as the canceled id is larger than current checkpoint id.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -396,21 +364,55 @@ synchronized void resetReceivedBarriers(long
checkpointId) {
return allBarriersReceivedFuture;
}
- synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() throws IOException {
numOpenChannels--;
+
+ if (resetPendingCheckpoint()) {
+ handler.notifyAbort(
+ currentReceivedCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ }
+ }
+
+ synchronized boolean setCancelledCheckpointId(long cancelledId)
{
+ if (currentReceivedCheckpointId > cancelledId ||
(currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
Review comment:
I think it also makes sense to use `isCheckpointPending()` considering
this comment reason
https://github.com/apache/flink/pull/12460#discussion_r437217966.
Maybe I want to refactor it next time when touching this code, since I do
not want to re-execute the azure to block something. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]