rkhachatryan commented on a change in pull request #8693:
URL: https://github.com/apache/flink/pull/8693#discussion_r426292974



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
##########
@@ -38,4 +38,12 @@
         * @throws Exception
         */
        void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been aborted.
+        *
+        * @param checkpointId The ID of the checkpoint that has been aborted.
+        * @throws Exception
+        */
+       void notifyCheckpointAborted(long checkpointId) throws Exception;

Review comment:
       I think this should be reflected in release notes (though the 
corresponding field in the jira issue).
   @pnowojski, WDYT?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
##########
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integrated tests to verify the logic to notify checkpoint aborted via RPC 
message.
+ */
+@RunWith(Parameterized.class)
+public class NotifyCheckpointAbortedITCase extends TestLogger {
+       private static final long SKIPPED_CHECKPOINT_ID = 1L;
+       private static final long ABORTED_CHECKPOINT_ID = 2L;
+       private static final long TEST_TIMEOUT = 60000;
+       private static final String StuckAsyncCheckpointMapName = 
"StuckAsyncCheckpointMap";
+       private static MiniClusterWithClientResource cluster;
+
+       private static Path checkpointPath;
+       private static String localRecoveryFolder;
+       private static StuckAsyncSnapshotStrategy stuckAsyncSnapshotStrategy;
+
+       @Parameterized.Parameter
+       public boolean unalignedCheckpointEnabled;
+
+       @Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
+       public static Collection<Boolean> parameter() {
+               return Arrays.asList(true, false);
+       }
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
true);
+               localRecoveryFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+               
configuration.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 localRecoveryFolder);
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
TestingHAFactory.class.getName());
+
+               checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(1)
+                               .setNumberSlotsPerTaskManager(1).build());
+               cluster.before();
+
+               stuckAsyncSnapshotStrategy = new StuckAsyncSnapshotStrategy();
+               TestingCompletedCheckpointStore.addCheckpointLatch.reset();
+               TestingCompletedCheckpointStore.abortCheckpointLatch.reset();
+               StuckAsyncCheckpointMap.checkpointAbortedLatch.reset();
+               BeforeExecuteCheckpointSink.notifiedCheckpointLatch.reset();
+               BeforeExecuteCheckpointSink.snapshotIds.clear();
+       }
+
+       @After
+       public void shutdown() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+
+       }
+
+       /**
+        * Verify operator at different phase of checkpoint could act as 
expected when notified of checkpoint abortion.
+        *
+        * <p>The job would run with at least two checkpoints. The 1st 
checkpoint would fail to add checkpoint to store,
+        * and we verify all local states stored in 1st checkpoint would then 
discarded. The 2nd checkpoint would decline
+        * by 'DeclineSink'. Then we verify the async runnable future of 
'StuckAsyncCheckpointMap' was canceled as expected,
+        * and 'BeforeExecuteCheckpointSink' did not execute the sync phase of 
checkpoint.
+        *
+        * <p>The job graph looks like:
+        * NormalSource --> keyBy --> NormalMap --> StuckAsyncCheckpointMap --> 
DeclineSink
+        *                                  |
+        *                                  |--> BarrierDelayMap -> 
BeforeExecuteCheckpointSink
+        */
+       @Test(timeout = TEST_TIMEOUT)
+       public void testNotifyCheckpointAborted() throws Exception {
+               Duration timeout = Duration.ofMillis(TEST_TIMEOUT);
+               Deadline deadline = Deadline.now().plus(timeout);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
+               
env.getCheckpointConfig().enableUnalignedCheckpoints(unalignedCheckpointEnabled);
+               
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
+               env.disableOperatorChaining();
+               env.setParallelism(1);
+
+               final StateBackend notifiedStateBackend = new 
NotifiedStateBackend(checkpointPath);
+               env.setStateBackend(notifiedStateBackend);
+
+               SingleOutputStreamOperator<Integer> normalMapStream = env
+                       .addSource(new NormalSource()).name("NormalSource")
+                       .keyBy((KeySelector<Tuple2<Integer, Integer>, Integer>) 
value -> value.f0)
+                       .map(new NormalMapFunction()).name("NormalMap");
+
+               normalMapStream.
+                       transform(StuckAsyncCheckpointMapName, 
TypeInformation.of(Integer.class), new StuckAsyncCheckpointMap())
+                       .addSink(new DeclineSink()).name("DeclineSink");
+
+               normalMapStream
+                       .transform("BarrierDelayMap", 
TypeInformation.of(Integer.class), new BarrierDelayMap())
+                       .transform("BeforeExecuteCheckpointSink", 
TypeInformation.of(Object.class), new BeforeExecuteCheckpointSink());
+
+               final ClusterClient<?> clusterClient = 
cluster.getClusterClient();
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               JobID jobID = jobGraph.getJobID();
+
+               ClientUtils.submitJob(clusterClient, jobGraph);
+
+               
TestingCompletedCheckpointStore.addCheckpointLatch.await(deadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+
+               Set<File> localStoredStates = new HashSet<>();
+               while (deadline.hasTimeLeft()) {
+                       // found the local state manager has stored states for 
checkpoint-1.
+                       localStoredStates.addAll(collectLocalStoredStates());
+                       if (!localStoredStates.isEmpty()) {
+                               break;
+                       }
+               }
+
+               // let the checkpoint-1 failed finally.
+               TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
+               while (deadline.hasTimeLeft()) {
+                       // verify the local state manager has been cleaned up 
once notified as checkpoint-1 aborted.
+                       localStoredStates.removeIf(file -> !file.exists());
+                       if (localStoredStates.isEmpty()) {
+                               break;
+                       }
+               }
+
+               // wait for StuckAsyncCheckpointMap notified as checkpoint 
aborted.
+               
StuckAsyncCheckpointMap.checkpointAbortedLatch.await(deadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+               
assertTrue(stuckAsyncSnapshotStrategy.blockingRunnableFuture.isCancelled());
+
+               // verify BeforeExecuteCheckpointSink never execute 
checkpoint-1.
+               while (deadline.hasTimeLeft()) {

Review comment:
       ditto about passing check because of the deadline

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
##########
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integrated tests to verify the logic to notify checkpoint aborted via RPC 
message.
+ */
+@RunWith(Parameterized.class)
+public class NotifyCheckpointAbortedITCase extends TestLogger {
+       private static final long SKIPPED_CHECKPOINT_ID = 1L;
+       private static final long ABORTED_CHECKPOINT_ID = 2L;
+       private static final long TEST_TIMEOUT = 60000;
+       private static final String StuckAsyncCheckpointMapName = 
"StuckAsyncCheckpointMap";
+       private static MiniClusterWithClientResource cluster;
+
+       private static Path checkpointPath;
+       private static String localRecoveryFolder;
+       private static StuckAsyncSnapshotStrategy stuckAsyncSnapshotStrategy;
+
+       @Parameterized.Parameter
+       public boolean unalignedCheckpointEnabled;
+
+       @Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
+       public static Collection<Boolean> parameter() {
+               return Arrays.asList(true, false);
+       }
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
true);
+               localRecoveryFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+               
configuration.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 localRecoveryFolder);
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
TestingHAFactory.class.getName());
+
+               checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(1)
+                               .setNumberSlotsPerTaskManager(1).build());
+               cluster.before();
+
+               stuckAsyncSnapshotStrategy = new StuckAsyncSnapshotStrategy();
+               TestingCompletedCheckpointStore.addCheckpointLatch.reset();
+               TestingCompletedCheckpointStore.abortCheckpointLatch.reset();
+               StuckAsyncCheckpointMap.checkpointAbortedLatch.reset();
+               BeforeExecuteCheckpointSink.notifiedCheckpointLatch.reset();
+               BeforeExecuteCheckpointSink.snapshotIds.clear();
+       }
+
+       @After
+       public void shutdown() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+
+       }
+
+       /**
+        * Verify operator at different phase of checkpoint could act as 
expected when notified of checkpoint abortion.
+        *
+        * <p>The job would run with at least two checkpoints. The 1st 
checkpoint would fail to add checkpoint to store,
+        * and we verify all local states stored in 1st checkpoint would then 
discarded. The 2nd checkpoint would decline
+        * by 'DeclineSink'. Then we verify the async runnable future of 
'StuckAsyncCheckpointMap' was canceled as expected,
+        * and 'BeforeExecuteCheckpointSink' did not execute the sync phase of 
checkpoint.
+        *
+        * <p>The job graph looks like:
+        * NormalSource --> keyBy --> NormalMap --> StuckAsyncCheckpointMap --> 
DeclineSink
+        *                                  |
+        *                                  |--> BarrierDelayMap -> 
BeforeExecuteCheckpointSink
+        */
+       @Test(timeout = TEST_TIMEOUT)
+       public void testNotifyCheckpointAborted() throws Exception {
+               Duration timeout = Duration.ofMillis(TEST_TIMEOUT);
+               Deadline deadline = Deadline.now().plus(timeout);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
+               
env.getCheckpointConfig().enableUnalignedCheckpoints(unalignedCheckpointEnabled);
+               
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
+               env.disableOperatorChaining();
+               env.setParallelism(1);
+
+               final StateBackend notifiedStateBackend = new 
NotifiedStateBackend(checkpointPath);
+               env.setStateBackend(notifiedStateBackend);
+
+               SingleOutputStreamOperator<Integer> normalMapStream = env
+                       .addSource(new NormalSource()).name("NormalSource")
+                       .keyBy((KeySelector<Tuple2<Integer, Integer>, Integer>) 
value -> value.f0)
+                       .map(new NormalMapFunction()).name("NormalMap");
+
+               normalMapStream.
+                       transform(StuckAsyncCheckpointMapName, 
TypeInformation.of(Integer.class), new StuckAsyncCheckpointMap())
+                       .addSink(new DeclineSink()).name("DeclineSink");
+
+               normalMapStream
+                       .transform("BarrierDelayMap", 
TypeInformation.of(Integer.class), new BarrierDelayMap())
+                       .transform("BeforeExecuteCheckpointSink", 
TypeInformation.of(Object.class), new BeforeExecuteCheckpointSink());
+
+               final ClusterClient<?> clusterClient = 
cluster.getClusterClient();
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               JobID jobID = jobGraph.getJobID();
+
+               ClientUtils.submitJob(clusterClient, jobGraph);
+
+               
TestingCompletedCheckpointStore.addCheckpointLatch.await(deadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+
+               Set<File> localStoredStates = new HashSet<>();
+               while (deadline.hasTimeLeft()) {
+                       // found the local state manager has stored states for 
checkpoint-1.
+                       localStoredStates.addAll(collectLocalStoredStates());
+                       if (!localStoredStates.isEmpty()) {
+                               break;
+                       }
+               }
+
+               // let the checkpoint-1 failed finally.
+               TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
+               while (deadline.hasTimeLeft()) {
+                       // verify the local state manager has been cleaned up 
once notified as checkpoint-1 aborted.
+                       localStoredStates.removeIf(file -> !file.exists());
+                       if (localStoredStates.isEmpty()) {
+                               break;
+                       }
+               }
+
+               // wait for StuckAsyncCheckpointMap notified as checkpoint 
aborted.
+               
StuckAsyncCheckpointMap.checkpointAbortedLatch.await(deadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+               
assertTrue(stuckAsyncSnapshotStrategy.blockingRunnableFuture.isCancelled());
+
+               // verify BeforeExecuteCheckpointSink never execute 
checkpoint-1.
+               while (deadline.hasTimeLeft()) {
+                       if (!BeforeExecuteCheckpointSink.snapshotIds.isEmpty()) 
{

Review comment:
       Can we use accumulators instead of a static field?
   It could save from sync/wait and improve readability (no static field).

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
##########
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integrated tests to verify the logic to notify checkpoint aborted via RPC 
message.
+ */
+@RunWith(Parameterized.class)
+public class NotifyCheckpointAbortedITCase extends TestLogger {
+       private static final long SKIPPED_CHECKPOINT_ID = 1L;
+       private static final long ABORTED_CHECKPOINT_ID = 2L;
+       private static final long TEST_TIMEOUT = 60000;
+       private static final String StuckAsyncCheckpointMapName = 
"StuckAsyncCheckpointMap";
+       private static MiniClusterWithClientResource cluster;
+
+       private static Path checkpointPath;
+       private static String localRecoveryFolder;
+       private static StuckAsyncSnapshotStrategy stuckAsyncSnapshotStrategy;
+
+       @Parameterized.Parameter
+       public boolean unalignedCheckpointEnabled;
+
+       @Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
+       public static Collection<Boolean> parameter() {
+               return Arrays.asList(true, false);
+       }
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
true);
+               localRecoveryFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+               
configuration.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 localRecoveryFolder);
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
TestingHAFactory.class.getName());
+
+               checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(1)
+                               .setNumberSlotsPerTaskManager(1).build());
+               cluster.before();
+
+               stuckAsyncSnapshotStrategy = new StuckAsyncSnapshotStrategy();
+               TestingCompletedCheckpointStore.addCheckpointLatch.reset();
+               TestingCompletedCheckpointStore.abortCheckpointLatch.reset();
+               StuckAsyncCheckpointMap.checkpointAbortedLatch.reset();
+               BeforeExecuteCheckpointSink.notifiedCheckpointLatch.reset();
+               BeforeExecuteCheckpointSink.snapshotIds.clear();
+       }
+
+       @After
+       public void shutdown() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+
+       }
+
+       /**
+        * Verify operator at different phase of checkpoint could act as 
expected when notified of checkpoint abortion.
+        *
+        * <p>The job would run with at least two checkpoints. The 1st 
checkpoint would fail to add checkpoint to store,
+        * and we verify all local states stored in 1st checkpoint would then 
discarded. The 2nd checkpoint would decline
+        * by 'DeclineSink'. Then we verify the async runnable future of 
'StuckAsyncCheckpointMap' was canceled as expected,
+        * and 'BeforeExecuteCheckpointSink' did not execute the sync phase of 
checkpoint.
+        *
+        * <p>The job graph looks like:
+        * NormalSource --> keyBy --> NormalMap --> StuckAsyncCheckpointMap --> 
DeclineSink
+        *                                  |
+        *                                  |--> BarrierDelayMap -> 
BeforeExecuteCheckpointSink
+        */
+       @Test(timeout = TEST_TIMEOUT)
+       public void testNotifyCheckpointAborted() throws Exception {
+               Duration timeout = Duration.ofMillis(TEST_TIMEOUT);
+               Deadline deadline = Deadline.now().plus(timeout);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
+               
env.getCheckpointConfig().enableUnalignedCheckpoints(unalignedCheckpointEnabled);
+               
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
+               env.disableOperatorChaining();
+               env.setParallelism(1);
+
+               final StateBackend notifiedStateBackend = new 
NotifiedStateBackend(checkpointPath);
+               env.setStateBackend(notifiedStateBackend);
+
+               SingleOutputStreamOperator<Integer> normalMapStream = env
+                       .addSource(new NormalSource()).name("NormalSource")
+                       .keyBy((KeySelector<Tuple2<Integer, Integer>, Integer>) 
value -> value.f0)
+                       .map(new NormalMapFunction()).name("NormalMap");
+
+               normalMapStream.
+                       transform(StuckAsyncCheckpointMapName, 
TypeInformation.of(Integer.class), new StuckAsyncCheckpointMap())
+                       .addSink(new DeclineSink()).name("DeclineSink");
+
+               normalMapStream
+                       .transform("BarrierDelayMap", 
TypeInformation.of(Integer.class), new BarrierDelayMap())
+                       .transform("BeforeExecuteCheckpointSink", 
TypeInformation.of(Object.class), new BeforeExecuteCheckpointSink());
+
+               final ClusterClient<?> clusterClient = 
cluster.getClusterClient();
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+               JobID jobID = jobGraph.getJobID();
+
+               ClientUtils.submitJob(clusterClient, jobGraph);
+
+               
TestingCompletedCheckpointStore.addCheckpointLatch.await(deadline.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
+
+               Set<File> localStoredStates = new HashSet<>();
+               while (deadline.hasTimeLeft()) {
+                       // found the local state manager has stored states for 
checkpoint-1.
+                       localStoredStates.addAll(collectLocalStoredStates());
+                       if (!localStoredStates.isEmpty()) {
+                               break;
+                       }
+               }
+
+               // let the checkpoint-1 failed finally.
+               TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
+               while (deadline.hasTimeLeft()) {

Review comment:
       I think this check will pass if the deadline is missed. Should it fail 
instead?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
##########
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integrated tests to verify the logic to notify checkpoint aborted via RPC 
message.
+ */
+@RunWith(Parameterized.class)
+public class NotifyCheckpointAbortedITCase extends TestLogger {
+       private static final long SKIPPED_CHECKPOINT_ID = 1L;
+       private static final long ABORTED_CHECKPOINT_ID = 2L;
+       private static final long TEST_TIMEOUT = 60000;
+       private static final String StuckAsyncCheckpointMapName = 
"StuckAsyncCheckpointMap";
+       private static MiniClusterWithClientResource cluster;
+
+       private static Path checkpointPath;
+       private static String localRecoveryFolder;
+       private static StuckAsyncSnapshotStrategy stuckAsyncSnapshotStrategy;
+
+       @Parameterized.Parameter
+       public boolean unalignedCheckpointEnabled;
+
+       @Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
+       public static Collection<Boolean> parameter() {
+               return Arrays.asList(true, false);
+       }
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Before
+       public void setup() throws Exception {
+               Configuration configuration = new Configuration();
+               configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, 
true);
+               localRecoveryFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+               
configuration.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 localRecoveryFolder);
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
TestingHAFactory.class.getName());
+
+               checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
+               cluster = new MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                               .setConfiguration(configuration)
+                               .setNumberTaskManagers(1)
+                               .setNumberSlotsPerTaskManager(1).build());
+               cluster.before();
+
+               stuckAsyncSnapshotStrategy = new StuckAsyncSnapshotStrategy();
+               TestingCompletedCheckpointStore.addCheckpointLatch.reset();
+               TestingCompletedCheckpointStore.abortCheckpointLatch.reset();
+               StuckAsyncCheckpointMap.checkpointAbortedLatch.reset();
+               BeforeExecuteCheckpointSink.notifiedCheckpointLatch.reset();
+               BeforeExecuteCheckpointSink.snapshotIds.clear();
+       }
+
+       @After
+       public void shutdown() {
+               if (cluster != null) {
+                       cluster.after();
+                       cluster = null;
+               }
+
+       }
+
+       /**
+        * Verify operator at different phase of checkpoint could act as 
expected when notified of checkpoint abortion.
+        *
+        * <p>The job would run with at least two checkpoints. The 1st 
checkpoint would fail to add checkpoint to store,
+        * and we verify all local states stored in 1st checkpoint would then 
discarded. The 2nd checkpoint would decline
+        * by 'DeclineSink'. Then we verify the async runnable future of 
'StuckAsyncCheckpointMap' was canceled as expected,
+        * and 'BeforeExecuteCheckpointSink' did not execute the sync phase of 
checkpoint.
+        *
+        * <p>The job graph looks like:
+        * NormalSource --> keyBy --> NormalMap --> StuckAsyncCheckpointMap --> 
DeclineSink
+        *                                  |
+        *                                  |--> BarrierDelayMap -> 
BeforeExecuteCheckpointSink
+        */
+       @Test(timeout = TEST_TIMEOUT)
+       public void testNotifyCheckpointAborted() throws Exception {

Review comment:
       This test does a great job of testing the functionality :+1: 
   
   However, considering its complexity and unit-test coverage, I think a valid 
trade-off would be to simply test that operator is notified that the checkpoint 
is aborted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to