tillrohrmann commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r587605723



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StopWithSavepointOperations.java
##########
@@ -18,15 +18,22 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
 /**
- * {@code CheckpointScheduling} provides methods for starting and stopping the 
periodic scheduling
- * of checkpoints.
+ * {@code StopWithSavepointOperations} provides methods for the 
stop-with-savepoint operation, such
+ * as starting and stopping the periodic scheduling of checkpoints, or 
triggering a savepoint.
  */
-public interface CheckpointScheduling {
+public interface StopWithSavepointOperations {
 
     /** Starts the periodic scheduling if possible. */
     void startCheckpointScheduler();
 
     /** Stops the periodic scheduling if possible. */
     void stopCheckpointScheduler();
+
+    CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(

Review comment:
       JavaDoc is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -430,14 +430,30 @@ protected void onStart() throws JobMasterException {
             final TaskExecutionState taskExecutionState) {
         checkNotNull(taskExecutionState, "taskExecutionState");
 
-        if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
-            return CompletableFuture.completedFuture(Acknowledge.get());
-        } else {
-            return FutureUtils.completedExceptionally(
-                    new ExecutionGraphException(
-                            "The execution attempt "
-                                    + taskExecutionState.getID()
-                                    + " was not found."));
+        try {
+            if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review comment:
       In which cases does the `SchedulerNG.updateTaskExecutionState` throw an 
exception which we want to send back to the caller? To me this looks always 
like a bug and thus a fatal exception.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -206,6 +234,14 @@ void goToFailing(
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 Throwable failureCause);
+
+        /** Transitions into the {@link StopWithSavepoint} state. */

Review comment:
       JavaDoc is not complete.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationHandlerImpl;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationManager;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been 
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state. 
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointOperationManager}, which is shared with {@link 
SchedulerBase}.
+ */
+class StopWithSavepoint extends StateWithExecutionGraph implements 
StopWithSavepointOperations {
+
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final Context context;
+    private final ClassLoader userCodeClassLoader;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            @Nullable String targetDirectory,
+            boolean terminate) {
+        super(context, executionGraph, executionGraphHandler, 
operatorCoordinatorHandler, logger);
+        this.context = context;
+        this.userCodeClassLoader = userCodeClassLoader;
+
+        // to ensure that all disjoint subgraphs of a job finish successfully 
on savepoint creation,
+        // we track the job termination via all execution termination futures 
(FLINK-21030).
+        final CompletableFuture<Collection<ExecutionState>> 
executionTerminationsFuture =
+                
SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph);
+
+        final StopWithSavepointOperationManager 
stopWithSavepointOperationManager =
+                new StopWithSavepointOperationManager(
+                        this,
+                        new StopWithSavepointOperationHandlerImpl(
+                                executionGraph.getJobID(), context, this, 
logger));
+
+        this.operationCompletionFuture =
+                stopWithSavepointOperationManager.trackStopWithSavepoint(
+                        terminate,
+                        targetDirectory,
+                        executionTerminationsFuture,
+                        context.getMainThreadExecutor());
+    }
+
+    @Override
+    public void cancel() {
+        // the canceling state will cancel the execution graph, which will 
fail the stop with
+        // savepoint operation.

Review comment:
       Also this ensures that we don't accidentally complete the operation when 
being in another state.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointOperationManager.java
##########
@@ -18,25 +18,37 @@
 
 package org.apache.flink.runtime.scheduler.stopwithsavepoint;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * {@code StopWithSavepointTerminationManager} fulfills the contract given by 
{@link

Review comment:
       `StopWithSavepointTerminationManager` no longer exists.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
##########
@@ -112,4 +122,20 @@ private static CheckpointIDCounter 
createCheckpointIdCounter(
             CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws 
Exception {
         return recoveryFactory.createCheckpointIDCounter(jobId);
     }
+
+    /**
+     * Returns a {@code CompletableFuture} collecting the termination states 
of all {@link Execution
+     * Executions} of the given {@link ExecutionGraph}.
+     *

Review comment:
       `@param` is missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/GlobalFailureHandler.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+/** Interface for notifying components about global failures. */
+public interface GlobalFailureHandler {
+    /** Notify component about a global failure. */

Review comment:
       I think interfaces and their methods should get a proper JavaDoc.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +121,177 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        StreamExecutionEnvironment env = 
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        StreamExecutionEnvironment env =
+                
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e, containsCause(CheckpointException.class));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnStop() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e, containsCause(FlinkException.class));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() 
throws Exception {

Review comment:
       Are we testing here that we clean up after a first failed savepoint 
operation?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -799,6 +799,37 @@ public void goToFailing(
                         failureCause));
     }
 
+    @Override
+    public CompletableFuture<String> goToStopWithSavepoint(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            @Nullable String targetDirectory,
+            boolean advanceToEndOfEventTime) {
+
+        transitionToState(
+                new StopWithSavepoint.Factory(
+                        this,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        LOG,
+                        userCodeClassLoader,
+                        targetDirectory,
+                        advanceToEndOfEventTime));
+
+        Optional<StopWithSavepoint> maybeStopWithSavepoint = 
state.as(StopWithSavepoint.class);

Review comment:
       Could we let `transitionToState` return the new state? Then we don't 
have to do this casting here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationHandlerImpl;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationManager;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been 
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state. 
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointOperationManager}, which is shared with {@link 
SchedulerBase}.
+ */
+class StopWithSavepoint extends StateWithExecutionGraph implements 
StopWithSavepointOperations {
+
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final Context context;
+    private final ClassLoader userCodeClassLoader;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            @Nullable String targetDirectory,
+            boolean terminate) {
+        super(context, executionGraph, executionGraphHandler, 
operatorCoordinatorHandler, logger);
+        this.context = context;
+        this.userCodeClassLoader = userCodeClassLoader;
+
+        // to ensure that all disjoint subgraphs of a job finish successfully 
on savepoint creation,
+        // we track the job termination via all execution termination futures 
(FLINK-21030).
+        final CompletableFuture<Collection<ExecutionState>> 
executionTerminationsFuture =
+                
SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph);
+
+        final StopWithSavepointOperationManager 
stopWithSavepointOperationManager =
+                new StopWithSavepointOperationManager(
+                        this,
+                        new StopWithSavepointOperationHandlerImpl(
+                                executionGraph.getJobID(), context, this, 
logger));
+
+        this.operationCompletionFuture =
+                stopWithSavepointOperationManager.trackStopWithSavepoint(
+                        terminate,
+                        targetDirectory,
+                        executionTerminationsFuture,
+                        context.getMainThreadExecutor());
+    }
+
+    @Override
+    public void cancel() {
+        // the canceling state will cancel the execution graph, which will 
fail the stop with
+        // savepoint operation.

Review comment:
       I think the contract would be clearer if we stop the 
`stopWithSavepointOperationManager` when leaving this state. That way it is 
clear the this state is responsible for the stop with savepoint operation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointOperationManager.java
##########
@@ -45,18 +57,28 @@ public StopWithSavepointTerminationManager(
      * Enforces the correct completion order of the passed {@code 
CompletableFuture} instances in
      * accordance to the contract of {@link 
StopWithSavepointTerminationHandler}.
      *
-     * @param completedSavepointFuture The {@code CompletableFuture} of the 
savepoint creation step.
+     * @param terminate Flag indicating whether to terminate or suspend the 
job.
+     * @param targetDirectory Target for the savepoint.
      * @param terminatedExecutionStatesFuture The {@code CompletableFuture} of 
the termination step.
      * @param mainThreadExecutor The executor the {@code 
StopWithSavepointTerminationHandler}
      *     operations run on.
      * @return A {@code CompletableFuture} containing the path to the created 
savepoint.
      */
-    public CompletableFuture<String> stopWithSavepoint(
-            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+    public CompletableFuture<String> trackStopWithSavepoint(
+            boolean terminate,

Review comment:
       Does it make sense to introduce an enum to give this flag a bit more 
expressive power? For example only looking at the code if one passes `false` 
then it is not very clear to me what this means.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -941,69 +940,39 @@ public void reportCheckpointMetrics(
 
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory, final boolean terminate) {
         mainThreadExecutor.assertRunningInMainThread();
 
         final CheckpointCoordinator checkpointCoordinator =
                 executionGraph.getCheckpointCoordinator();
 
-        if (checkpointCoordinator == null) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalStateException(
-                            String.format("Job %s is not a streaming job.", 
jobGraph.getJobID())));
-        }
-
-        if (targetDirectory == null
-                && 
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
-            log.info(
-                    "Trying to cancel job {} with savepoint, but no savepoint 
directory configured.",
-                    jobGraph.getJobID());
-
-            return FutureUtils.completedExceptionally(
-                    new IllegalStateException(
-                            "No savepoint directory configured. You can either 
specify a directory "
-                                    + "while cancelling via -s 
:targetDirectory or configure a cluster-wide "
-                                    + "default via key '"
-                                    + 
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
-                                    + "'."));
+        Optional<IllegalStateException> argumentCheckException =
+                
StopWithSavepointOperationManager.checkStopWithSavepointPreconditions(
+                        checkpointCoordinator, targetDirectory, 
executionGraph.getJobID(), log);
+        if (argumentCheckException.isPresent()) {
+            return 
FutureUtils.completedExceptionally(argumentCheckException.get());

Review comment:
       I think it should be fine to simply throw the `IllegalStateException`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointOperationHandlerImpl.java
##########
@@ -49,35 +48,34 @@
  * <p>The implementation expects the savepoint creation being completed before 
the executions
  * terminate.
  *
- * @see StopWithSavepointTerminationManager
+ * @see StopWithSavepointOperationManager
  */
-public class StopWithSavepointTerminationHandlerImpl
-        implements StopWithSavepointTerminationHandler {
+public class StopWithSavepointOperationHandlerImpl implements 
StopWithSavepointTerminationHandler {
 
     private final Logger log;
 
-    private final SchedulerNG scheduler;
-    private final CheckpointScheduling checkpointScheduling;
+    private final GlobalFailureHandler globalFailureHandler;
+    private final StopWithSavepointOperations stopWithSavepointOperations;
     private final JobID jobId;
 
     private final CompletableFuture<String> result = new CompletableFuture<>();
 
     private State state = new WaitingForSavepoint();
 
-    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
-            JobID jobId, S schedulerWithCheckpointing, Logger log) {
+    public <S extends GlobalFailureHandler & StopWithSavepointOperations>
+            StopWithSavepointOperationHandlerImpl(
+                    JobID jobId, S schedulerWithCheckpointing, Logger log) {

Review comment:
       Maybe update the parameter name.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationHandlerImpl;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationManager;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been 
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state. 
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointOperationManager}, which is shared with {@link 
SchedulerBase}.
+ */
+class StopWithSavepoint extends StateWithExecutionGraph implements 
StopWithSavepointOperations {
+
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final Context context;
+    private final ClassLoader userCodeClassLoader;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            @Nullable String targetDirectory,
+            boolean terminate) {
+        super(context, executionGraph, executionGraphHandler, 
operatorCoordinatorHandler, logger);
+        this.context = context;
+        this.userCodeClassLoader = userCodeClassLoader;
+
+        // to ensure that all disjoint subgraphs of a job finish successfully 
on savepoint creation,
+        // we track the job termination via all execution termination futures 
(FLINK-21030).
+        final CompletableFuture<Collection<ExecutionState>> 
executionTerminationsFuture =
+                
SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph);
+
+        final StopWithSavepointOperationManager 
stopWithSavepointOperationManager =
+                new StopWithSavepointOperationManager(
+                        this,
+                        new StopWithSavepointOperationHandlerImpl(
+                                executionGraph.getJobID(), context, this, 
logger));
+
+        this.operationCompletionFuture =
+                stopWithSavepointOperationManager.trackStopWithSavepoint(
+                        terminate,
+                        targetDirectory,
+                        executionTerminationsFuture,
+                        context.getMainThreadExecutor());
+    }
+
+    @Override
+    public void cancel() {
+        // the canceling state will cancel the execution graph, which will 
fail the stop with
+        // savepoint operation.
+        context.goToCanceling(
+                getExecutionGraph(), getExecutionGraphHandler(), 
getOperatorCoordinatorHandler());
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return JobStatus.RUNNING;
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        handleAnyFailure(cause);
+    }
+
+    /**
+     * The {@code executionTerminationsFuture} will complete if a task reached 
a terminal state, and
+     * {@link StopWithSavepointOperationManager} will act accordingly.
+     */
+    @Override
+    boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+        final boolean successfulUpdate =
+                getExecutionGraph().updateState(taskExecutionStateTransition);
+
+        if (successfulUpdate) {
+            if (taskExecutionStateTransition.getExecutionState() == 
ExecutionState.FAILED) {
+                Throwable cause = 
taskExecutionStateTransition.getError(userCodeClassLoader);
+                handleAnyFailure(cause);
+            }
+        }
+
+        return successfulUpdate;
+    }
+
+    @Override
+    void onGloballyTerminalState(JobStatus globallyTerminalState) {
+        
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
+    }
+
+    private void handleAnyFailure(Throwable cause) {
+        final Executing.FailureResult failureResult = 
context.howToHandleFailure(cause);
+
+        if (failureResult.canRestart()) {
+            context.goToRestarting(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    failureResult.getBackoffTime());
+        } else {
+            context.goToFailing(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    failureResult.getFailureCause());
+        }
+    }
+
+    CompletableFuture<String> getOperationCompletionFuture() {
+        return operationCompletionFuture;
+    }
+
+    @Override
+    public void startCheckpointScheduler() {
+        final CheckpointCoordinator coordinator = 
getExecutionGraph().getCheckpointCoordinator();
+        if (coordinator == null) {
+            if (!getExecutionGraph().getState().isTerminalState()) {
+                // for a streaming job, the checkpoint coordinator is always 
set (even if periodic
+                // checkpoints are disabled). The only situation where it can 
be null is when the
+                // job reached a terminal state.
+                throw new IllegalStateException(
+                        "Coordinator is only allowed to be null if we are in a 
terminal state.");
+            }
+            return;
+        }
+        if (coordinator.isPeriodicCheckpointingConfigured()) {
+            try {
+                coordinator.startCheckpointScheduler();
+            } catch (IllegalStateException ignored) {
+                // Concurrent shut down of the coordinator

Review comment:
       How can this happen if this method is called from the main thread?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,30 @@ public void notifyNewResourcesAvailable() {
         }
     }
 
+    CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory, boolean terminate) {

Review comment:
       In `SchedulerBase` the parameters are in a different order. I think we 
should try to be consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to