tillrohrmann commented on a change in pull request #14948: URL: https://github.com/apache/flink/pull/14948#discussion_r587605723
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StopWithSavepointOperations.java ########## @@ -18,15 +18,22 @@ package org.apache.flink.runtime.checkpoint; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + /** - * {@code CheckpointScheduling} provides methods for starting and stopping the periodic scheduling - * of checkpoints. + * {@code StopWithSavepointOperations} provides methods for the stop-with-savepoint operation, such + * as starting and stopping the periodic scheduling of checkpoints, or triggering a savepoint. */ -public interface CheckpointScheduling { +public interface StopWithSavepointOperations { /** Starts the periodic scheduling if possible. */ void startCheckpointScheduler(); /** Stops the periodic scheduling if possible. */ void stopCheckpointScheduler(); + + CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint( Review comment: JavaDoc is missing. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ########## @@ -430,14 +430,30 @@ protected void onStart() throws JobMasterException { final TaskExecutionState taskExecutionState) { checkNotNull(taskExecutionState, "taskExecutionState"); - if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { - return CompletableFuture.completedFuture(Acknowledge.get()); - } else { - return FutureUtils.completedExceptionally( - new ExecutionGraphException( - "The execution attempt " - + taskExecutionState.getID() - + " was not found.")); + try { + if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review comment: In which cases does the `SchedulerNG.updateTaskExecutionState` throw an exception which we want to send back to the caller? To me this looks always like a bug and thus a fatal exception. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java ########## @@ -206,6 +234,14 @@ void goToFailing( ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause); + + /** Transitions into the {@link StopWithSavepoint} state. */ Review comment: JavaDoc is not complete. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.SchedulerUtils; +import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationHandlerImpl; +import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationManager; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * When a "stop with savepoint" operation (wait until savepoint has been created, then cancel job) + * is triggered on the {@link Executing} state, we transition into this state. This state is + * delegating the tracking of the stop with savepoint operation to the {@link + * StopWithSavepointOperationManager}, which is shared with {@link SchedulerBase}. + */ +class StopWithSavepoint extends StateWithExecutionGraph implements StopWithSavepointOperations { + + private final CompletableFuture<String> operationCompletionFuture; + private final Context context; + private final ClassLoader userCodeClassLoader; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + @Nullable String targetDirectory, + boolean terminate) { + super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); + this.context = context; + this.userCodeClassLoader = userCodeClassLoader; + + // to ensure that all disjoint subgraphs of a job finish successfully on savepoint creation, + // we track the job termination via all execution termination futures (FLINK-21030). + final CompletableFuture<Collection<ExecutionState>> executionTerminationsFuture = + SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph); + + final StopWithSavepointOperationManager stopWithSavepointOperationManager = + new StopWithSavepointOperationManager( + this, + new StopWithSavepointOperationHandlerImpl( + executionGraph.getJobID(), context, this, logger)); + + this.operationCompletionFuture = + stopWithSavepointOperationManager.trackStopWithSavepoint( + terminate, + targetDirectory, + executionTerminationsFuture, + context.getMainThreadExecutor()); + } + + @Override + public void cancel() { + // the canceling state will cancel the execution graph, which will fail the stop with + // savepoint operation. Review comment: Also this ensures that we don't accidentally complete the operation when being in another state. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointOperationManager.java ########## @@ -18,25 +18,37 @@ package org.apache.flink.runtime.scheduler.stopwithsavepoint; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import javax.annotation.Nullable; + import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * {@code StopWithSavepointTerminationManager} fulfills the contract given by {@link Review comment: `StopWithSavepointTerminationManager` no longer exists. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java ########## @@ -112,4 +122,20 @@ private static CheckpointIDCounter createCheckpointIdCounter( CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception { return recoveryFactory.createCheckpointIDCounter(jobId); } + + /** + * Returns a {@code CompletableFuture} collecting the termination states of all {@link Execution + * Executions} of the given {@link ExecutionGraph}. + * Review comment: `@param` is missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/GlobalFailureHandler.java ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +/** Interface for notifying components about global failures. */ +public interface GlobalFailureHandler { + /** Notify component about a global failure. */ Review comment: I think interfaces and their methods should get a proper JavaDoc. ########## File path: flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java ########## @@ -88,6 +121,177 @@ public void testGlobalFailoverCanRecoverState() throws Exception { env.execute(); } + private enum StopWithSavepointTestBehavior { + NO_FAILURE, + FAIL_ON_CHECKPOINT, + FAIL_ON_STOP, + FAIL_ON_FIRST_CHECKPOINT_ONLY + } + + @Test + public void testStopWithSavepointNoError() throws Exception { + StreamExecutionEnvironment env = getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE); + + DummySource.resetForParallelism(PARALLELISM); + + JobClient client = env.executeAsync(); + + DummySource.awaitRunning(); + + final File savepointDirectory = tempFolder.newFolder("savepoint"); + final String savepoint = + client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get(); + assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath())); + assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED)); + } + + @Test + public void testStopWithSavepointFailOnCheckpoint() throws Exception { + StreamExecutionEnvironment env = + getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); + + DummySource.resetForParallelism(PARALLELISM); + + JobClient client = env.executeAsync(); + + DummySource.awaitRunning(); + try { + client.stopWithSavepoint(false, tempFolder.newFolder("savepoint").getAbsolutePath()) + .get(); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e, containsCause(CheckpointException.class)); + } + // expect job to run again (maybe restart) + CommonTestUtils.waitUntilCondition( + () -> client.getJobStatus().get() == JobStatus.RUNNING, + Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES))); + } + + @Test + public void testStopWithSavepointFailOnStop() throws Exception { + StreamExecutionEnvironment env = + getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); + + DummySource.resetForParallelism(PARALLELISM); + + JobClient client = env.executeAsync(); + + DummySource.awaitRunning(); + try { + client.stopWithSavepoint(false, tempFolder.newFolder("savepoint").getAbsolutePath()) + .get(); + fail("Expect exception"); + } catch (ExecutionException e) { + assertThat(e, containsCause(FlinkException.class)); + } + // expect job to run again (maybe restart) + CommonTestUtils.waitUntilCondition( + () -> client.getJobStatus().get() == JobStatus.RUNNING, + Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES))); + } + + @Test + public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Exception { Review comment: Are we testing here that we clean up after a first failed savepoint operation? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java ########## @@ -799,6 +799,37 @@ public void goToFailing( failureCause)); } + @Override + public CompletableFuture<String> goToStopWithSavepoint( + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + @Nullable String targetDirectory, + boolean advanceToEndOfEventTime) { + + transitionToState( + new StopWithSavepoint.Factory( + this, + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + LOG, + userCodeClassLoader, + targetDirectory, + advanceToEndOfEventTime)); + + Optional<StopWithSavepoint> maybeStopWithSavepoint = state.as(StopWithSavepoint.class); Review comment: Could we let `transitionToState` return the new state? Then we don't have to do this casting here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.SchedulerUtils; +import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationHandlerImpl; +import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationManager; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * When a "stop with savepoint" operation (wait until savepoint has been created, then cancel job) + * is triggered on the {@link Executing} state, we transition into this state. This state is + * delegating the tracking of the stop with savepoint operation to the {@link + * StopWithSavepointOperationManager}, which is shared with {@link SchedulerBase}. + */ +class StopWithSavepoint extends StateWithExecutionGraph implements StopWithSavepointOperations { + + private final CompletableFuture<String> operationCompletionFuture; + private final Context context; + private final ClassLoader userCodeClassLoader; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + @Nullable String targetDirectory, + boolean terminate) { + super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); + this.context = context; + this.userCodeClassLoader = userCodeClassLoader; + + // to ensure that all disjoint subgraphs of a job finish successfully on savepoint creation, + // we track the job termination via all execution termination futures (FLINK-21030). + final CompletableFuture<Collection<ExecutionState>> executionTerminationsFuture = + SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph); + + final StopWithSavepointOperationManager stopWithSavepointOperationManager = + new StopWithSavepointOperationManager( + this, + new StopWithSavepointOperationHandlerImpl( + executionGraph.getJobID(), context, this, logger)); + + this.operationCompletionFuture = + stopWithSavepointOperationManager.trackStopWithSavepoint( + terminate, + targetDirectory, + executionTerminationsFuture, + context.getMainThreadExecutor()); + } + + @Override + public void cancel() { + // the canceling state will cancel the execution graph, which will fail the stop with + // savepoint operation. Review comment: I think the contract would be clearer if we stop the `stopWithSavepointOperationManager` when leaving this state. That way it is clear the this state is responsible for the stop with savepoint operation. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointOperationManager.java ########## @@ -45,18 +57,28 @@ public StopWithSavepointTerminationManager( * Enforces the correct completion order of the passed {@code CompletableFuture} instances in * accordance to the contract of {@link StopWithSavepointTerminationHandler}. * - * @param completedSavepointFuture The {@code CompletableFuture} of the savepoint creation step. + * @param terminate Flag indicating whether to terminate or suspend the job. + * @param targetDirectory Target for the savepoint. * @param terminatedExecutionStatesFuture The {@code CompletableFuture} of the termination step. * @param mainThreadExecutor The executor the {@code StopWithSavepointTerminationHandler} * operations run on. * @return A {@code CompletableFuture} containing the path to the created savepoint. */ - public CompletableFuture<String> stopWithSavepoint( - CompletableFuture<CompletedCheckpoint> completedSavepointFuture, + public CompletableFuture<String> trackStopWithSavepoint( + boolean terminate, Review comment: Does it make sense to introduce an enum to give this flag a bit more expressive power? For example only looking at the code if one passes `false` then it is not very clear to me what this means. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -941,69 +940,39 @@ public void reportCheckpointMetrics( @Override public CompletableFuture<String> stopWithSavepoint( - final String targetDirectory, final boolean terminate) { + @Nullable final String targetDirectory, final boolean terminate) { mainThreadExecutor.assertRunningInMainThread(); final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - if (checkpointCoordinator == null) { - return FutureUtils.completedExceptionally( - new IllegalStateException( - String.format("Job %s is not a streaming job.", jobGraph.getJobID()))); - } - - if (targetDirectory == null - && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { - log.info( - "Trying to cancel job {} with savepoint, but no savepoint directory configured.", - jobGraph.getJobID()); - - return FutureUtils.completedExceptionally( - new IllegalStateException( - "No savepoint directory configured. You can either specify a directory " - + "while cancelling via -s :targetDirectory or configure a cluster-wide " - + "default via key '" - + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() - + "'.")); + Optional<IllegalStateException> argumentCheckException = + StopWithSavepointOperationManager.checkStopWithSavepointPreconditions( + checkpointCoordinator, targetDirectory, executionGraph.getJobID(), log); + if (argumentCheckException.isPresent()) { + return FutureUtils.completedExceptionally(argumentCheckException.get()); Review comment: I think it should be fine to simply throw the `IllegalStateException`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointOperationHandlerImpl.java ########## @@ -49,35 +48,34 @@ * <p>The implementation expects the savepoint creation being completed before the executions * terminate. * - * @see StopWithSavepointTerminationManager + * @see StopWithSavepointOperationManager */ -public class StopWithSavepointTerminationHandlerImpl - implements StopWithSavepointTerminationHandler { +public class StopWithSavepointOperationHandlerImpl implements StopWithSavepointTerminationHandler { private final Logger log; - private final SchedulerNG scheduler; - private final CheckpointScheduling checkpointScheduling; + private final GlobalFailureHandler globalFailureHandler; + private final StopWithSavepointOperations stopWithSavepointOperations; private final JobID jobId; private final CompletableFuture<String> result = new CompletableFuture<>(); private State state = new WaitingForSavepoint(); - public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl( - JobID jobId, S schedulerWithCheckpointing, Logger log) { + public <S extends GlobalFailureHandler & StopWithSavepointOperations> + StopWithSavepointOperationHandlerImpl( + JobID jobId, S schedulerWithCheckpointing, Logger log) { Review comment: Maybe update the parameter name. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.SchedulerUtils; +import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationHandlerImpl; +import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointOperationManager; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * When a "stop with savepoint" operation (wait until savepoint has been created, then cancel job) + * is triggered on the {@link Executing} state, we transition into this state. This state is + * delegating the tracking of the stop with savepoint operation to the {@link + * StopWithSavepointOperationManager}, which is shared with {@link SchedulerBase}. + */ +class StopWithSavepoint extends StateWithExecutionGraph implements StopWithSavepointOperations { + + private final CompletableFuture<String> operationCompletionFuture; + private final Context context; + private final ClassLoader userCodeClassLoader; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + ClassLoader userCodeClassLoader, + @Nullable String targetDirectory, + boolean terminate) { + super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); + this.context = context; + this.userCodeClassLoader = userCodeClassLoader; + + // to ensure that all disjoint subgraphs of a job finish successfully on savepoint creation, + // we track the job termination via all execution termination futures (FLINK-21030). + final CompletableFuture<Collection<ExecutionState>> executionTerminationsFuture = + SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph); + + final StopWithSavepointOperationManager stopWithSavepointOperationManager = + new StopWithSavepointOperationManager( + this, + new StopWithSavepointOperationHandlerImpl( + executionGraph.getJobID(), context, this, logger)); + + this.operationCompletionFuture = + stopWithSavepointOperationManager.trackStopWithSavepoint( + terminate, + targetDirectory, + executionTerminationsFuture, + context.getMainThreadExecutor()); + } + + @Override + public void cancel() { + // the canceling state will cancel the execution graph, which will fail the stop with + // savepoint operation. + context.goToCanceling( + getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler()); + } + + @Override + public JobStatus getJobStatus() { + return JobStatus.RUNNING; + } + + @Override + public void handleGlobalFailure(Throwable cause) { + handleAnyFailure(cause); + } + + /** + * The {@code executionTerminationsFuture} will complete if a task reached a terminal state, and + * {@link StopWithSavepointOperationManager} will act accordingly. + */ + @Override + boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { + final boolean successfulUpdate = + getExecutionGraph().updateState(taskExecutionStateTransition); + + if (successfulUpdate) { + if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) { + Throwable cause = taskExecutionStateTransition.getError(userCodeClassLoader); + handleAnyFailure(cause); + } + } + + return successfulUpdate; + } + + @Override + void onGloballyTerminalState(JobStatus globallyTerminalState) { + context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph())); + } + + private void handleAnyFailure(Throwable cause) { + final Executing.FailureResult failureResult = context.howToHandleFailure(cause); + + if (failureResult.canRestart()) { + context.goToRestarting( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler(), + failureResult.getBackoffTime()); + } else { + context.goToFailing( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler(), + failureResult.getFailureCause()); + } + } + + CompletableFuture<String> getOperationCompletionFuture() { + return operationCompletionFuture; + } + + @Override + public void startCheckpointScheduler() { + final CheckpointCoordinator coordinator = getExecutionGraph().getCheckpointCoordinator(); + if (coordinator == null) { + if (!getExecutionGraph().getState().isTerminalState()) { + // for a streaming job, the checkpoint coordinator is always set (even if periodic + // checkpoints are disabled). The only situation where it can be null is when the + // job reached a terminal state. + throw new IllegalStateException( + "Coordinator is only allowed to be null if we are in a terminal state."); + } + return; + } + if (coordinator.isPeriodicCheckpointingConfigured()) { + try { + coordinator.startCheckpointScheduler(); + } catch (IllegalStateException ignored) { + // Concurrent shut down of the coordinator Review comment: How can this happen if this method is called from the main thread? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java ########## @@ -143,6 +147,30 @@ public void notifyNewResourcesAvailable() { } } + CompletableFuture<String> stopWithSavepoint( + @Nullable final String targetDirectory, boolean terminate) { Review comment: In `SchedulerBase` the parameters are in a different order. I think we should try to be consistent. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org