tillrohrmann commented on a change in pull request #14948: URL: https://github.com/apache/flink/pull/14948#discussion_r596805493
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * Tracks a "stop with savepoint" operation. The incoming "savepointFuture" is coming from the + * {@link CheckpointCoordinator}, which takes care of triggering a savepoint, and then shutting down + * the job (on success). + * + * <p>This state is tracking the future to act accordingly on it. The savepoint path (= the result + * of the operation) is made available via the "operationFuture" to the user. This operation is only + * considered successfully if the "savepointFuture" completed successfully, and the job reached the + * terminal state FINISHED. + */ +class StopWithSavepoint extends StateWithExecutionGraph { + + private final Context context; + private final ClassLoader userCodeClassLoader; + + private final CompletableFuture<String> operationFuture; + + private final CheckpointScheduling checkpointScheduling; + + private boolean hasFullyFinished = false; + + @Nullable private String savepoint = null; + + @Nullable private Throwable operationFailureCause; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + CheckpointScheduling checkpointScheduling, + Logger logger, + ClassLoader userCodeClassLoader, + CompletableFuture<String> savepointFuture) { + super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); + this.context = context; + this.userCodeClassLoader = userCodeClassLoader; + this.checkpointScheduling = checkpointScheduling; + this.operationFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException( + savepointFuture.handle( + (savepointLocation, throwable) -> { + context.runIfState( + this, + () -> handleSavepointCompletion(savepointLocation, throwable), + Duration.ZERO); Review comment: Maybe add a comment stating that we need this in order to not execute the callback within the constructor. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * Tracks a "stop with savepoint" operation. The incoming "savepointFuture" is coming from the + * {@link CheckpointCoordinator}, which takes care of triggering a savepoint, and then shutting down + * the job (on success). + * + * <p>This state is tracking the future to act accordingly on it. The savepoint path (= the result + * of the operation) is made available via the "operationFuture" to the user. This operation is only + * considered successfully if the "savepointFuture" completed successfully, and the job reached the + * terminal state FINISHED. + */ +class StopWithSavepoint extends StateWithExecutionGraph { Review comment: Where are we stopping the `CheckpointCoordinator`? I guess we are lacking test coverage for this. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); Review comment: Are you sure that we don't have the same ordering as in `testJobFinishedBeforeSavepointFuture`? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); Review comment: Fair enough. We have tested this with `testExceptionalFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } Review comment: Does this test completes the savepoint operation? This would be good to know for the test whether the state waits for it or not. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); Review comment: We should check that the operation has been failed. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); Review comment: Are testing the different orderings of signals? 1) First savepoint future completion and then final state 2) First final state and then savepoint future completion ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); + } + } + + @Test + public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure( + (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + + ctx.setExpectRestarting(assertNonNull()); + + sws.handleGlobalFailure(new RuntimeException()); + } + } + + @Test + public void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getFailureCause(), + containsCause(RuntimeException.class)); + }); + + sws.handleGlobalFailure(new RuntimeException()); + } + } + + @Test + public void testFailingOnUpdateTaskExecutionStateWithNoRestart() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph()); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getFailureCause(), + containsCause(RuntimeException.class)); + }); + + assertThat(sws.updateTaskExecutionState(createFailingStateTransition()), is(true)); + } + } + + @Test + public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph()); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure( + (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + + ctx.setExpectRestarting(assertNonNull()); + + assertThat(sws.updateTaskExecutionState(createFailingStateTransition()), is(true)); + } + } + + @Test + public void testExceptionalFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion() Review comment: ```suggestion public void testExceptionalOperationFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion() ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); + } + } + + @Test + public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure( + (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + + ctx.setExpectRestarting(assertNonNull()); + + sws.handleGlobalFailure(new RuntimeException()); Review comment: Same here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); + } + } + + @Test + public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure( + (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + + ctx.setExpectRestarting(assertNonNull()); + + sws.handleGlobalFailure(new RuntimeException()); Review comment: Covered by `testExceptionalFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * Tracks a "stop with savepoint" operation. The incoming "savepointFuture" is coming from the + * {@link CheckpointCoordinator}, which takes care of triggering a savepoint, and then shutting down + * the job (on success). + * + * <p>This state is tracking the future to act accordingly on it. The savepoint path (= the result + * of the operation) is made available via the "operationFuture" to the user. This operation is only + * considered successfully if the "savepointFuture" completed successfully, and the job reached the + * terminal state FINISHED. + */ +class StopWithSavepoint extends StateWithExecutionGraph { + + private final Context context; + private final ClassLoader userCodeClassLoader; + + private final CompletableFuture<String> operationFuture; + + private final CheckpointScheduling checkpointScheduling; + + private boolean hasFullyFinished = false; + + @Nullable private String savepoint = null; + + @Nullable private Throwable operationFailureCause; + + StopWithSavepoint( + Context context, + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + CheckpointScheduling checkpointScheduling, + Logger logger, + ClassLoader userCodeClassLoader, + CompletableFuture<String> savepointFuture) { + super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); + this.context = context; + this.userCodeClassLoader = userCodeClassLoader; + this.checkpointScheduling = checkpointScheduling; + this.operationFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException( + savepointFuture.handle( + (savepointLocation, throwable) -> { + context.runIfState( + this, + () -> handleSavepointCompletion(savepointLocation, throwable), + Duration.ZERO); + return null; + })); + } + + private void handleSavepointCompletion( + @Nullable String savepoint, @Nullable Throwable throwable) { + if (hasFullyFinished) { + Preconditions.checkState( + throwable == null, + "A savepoint should never fail after a job has been terminated via stop-with-savepoint."); + completeOperationAndGoToFinished(savepoint); + } else { + if (throwable != null) { + operationFailureCause = throwable; + checkpointScheduling.startCheckpointScheduler(); + context.goToExecuting( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler()); + } else { + this.savepoint = savepoint; + } + } + } + + @Override + public void onLeave(Class<? extends State> newState) { + this.operationFuture.completeExceptionally( + new FlinkException( + "Stop with savepoint operation could not be completed.", + operationFailureCause)); + + super.onLeave(newState); + } + + @Override + public void cancel() { + context.goToCanceling( + getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler()); + } + + @Override + public JobStatus getJobStatus() { + return JobStatus.RUNNING; + } + + @Override + public void handleGlobalFailure(Throwable cause) { + handleAnyFailure(cause); + } + + @Override + boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) { + final boolean successfulUpdate = + getExecutionGraph().updateState(taskExecutionStateTransition); + + if (successfulUpdate) { + if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) { + Throwable cause = taskExecutionStateTransition.getError(userCodeClassLoader); + handleAnyFailure(cause); + } + } + + return successfulUpdate; + } + + @Override + void onGloballyTerminalState(JobStatus globallyTerminalState) { + if (globallyTerminalState == JobStatus.FINISHED) { + if (savepoint == null) { + hasFullyFinished = true; + } else { + completeOperationAndGoToFinished(savepoint); + } + } else { + handleAnyFailure(new FlinkException("Job did not finish properly.")); Review comment: ```suggestion handleAnyFailure(new FlinkException("Job did not reach the FINISHED state while performing stop-with-savepoint.")); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java ########## @@ -246,7 +251,30 @@ public void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Excep } } - private static TaskExecutionStateTransition createFailingStateTransition() { + @Test + public void testTransitionToStopWithSavepointState() throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + CheckpointCoordinator coordinator = + new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().build(); + StateTrackingMockExecutionGraph mockedExecutionGraphWithCheckpointCoordinator = + new StateTrackingMockExecutionGraph() { + @Nullable + @Override + public CheckpointCoordinator getCheckpointCoordinator() { + return coordinator; + } + }; + Executing exec = + new ExecutingStateBuilder() + .setExecutionGraph(mockedExecutionGraphWithCheckpointCoordinator) + .build(ctx); + + ctx.setExpectStopWithSavepoint(assertNonNull()); + exec.stopWithSavepoint("file:///tmp/target", true); Review comment: We should check that the checkpoint scheduling has been stopped. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); Review comment: Same here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); + } + } + + @Test + public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure( + (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + + ctx.setExpectRestarting(assertNonNull()); + + sws.handleGlobalFailure(new RuntimeException()); + } + } + + @Test + public void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getFailureCause(), + containsCause(RuntimeException.class)); + }); + + sws.handleGlobalFailure(new RuntimeException()); Review comment: Same here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); + } + } + + @Test + public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure( + (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + + ctx.setExpectRestarting(assertNonNull()); + + sws.handleGlobalFailure(new RuntimeException()); + } + } + + @Test + public void testFailingOnGlobalFailureIfNoRestartConfigured() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getFailureCause(), + containsCause(RuntimeException.class)); + }); + + sws.handleGlobalFailure(new RuntimeException()); Review comment: Covered by `testExceptionalFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.checkpoint.CheckpointScheduling; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.runtime.scheduler.adaptive.ExecutingTest.createFailingStateTransition; +import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link StopWithSavepoint} state. */ +public class StopWithSavepointTest extends TestLogger { + private static final String SAVEPOINT_PATH = "test://savepoint/path"; + + @Test + public void testFinishedOnSuccessfulStopWithSavepoint() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + savepointFuture.complete(SAVEPOINT_PATH); + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testJobFailed() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = createStopWithSavepoint(ctx, mockExecutionGraph); + ctx.setStopWithSavepoint(sws); + ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); + + ctx.setExpectFailing( + failingArguments -> { + assertThat( + failingArguments.getExecutionGraph().getState(), + is(JobStatus.FAILED)); + assertThat( + failingArguments.getFailureCause(), + containsCause(FlinkException.class)); + }); + + // fail job: + mockExecutionGraph.completeTerminationFuture(JobStatus.FAILED); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + } + + @Test + public void testJobFinishedBeforeSavepointFuture() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StateTrackingMockExecutionGraph mockExecutionGraph = + new StateTrackingMockExecutionGraph(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockExecutionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectFinished(assertNonNull()); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerScheduledExecutors(); + + assertThat(sws.getOperationFuture().get(), is(SAVEPOINT_PATH)); + } + } + + @Test + public void testTransitiontoCancellingOnCancel() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setStopWithSavepoint(sws); + ctx.setExpectCancelling(assertNonNull()); + + sws.cancel(); + } + } + + @Test + public void testTransitionToFinishedOnSuspend() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + StopWithSavepoint sws = createStopWithSavepoint(ctx); + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + }); + + sws.suspend(new RuntimeException()); Review comment: Covered by `testExceptionalFutureCompletionOnLeaveWhileWaitingOnSavepointCompletion` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org