XComp commented on code in PR #19968: URL: https://github.com/apache/flink/pull/19968#discussion_r901587633
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java: ########## @@ -377,6 +378,7 @@ void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception { ctx.setStopWithSavepoint(sws); sws.onGloballyTerminalState(JobStatus.FINISHED); + // this is a sanity check that we haven't scheduled a state transition Review Comment: there's another location in `testConcurrentSavepointFailureAndGloballyTerminalStateCauseRestart` (see [line 407](https://github.com/apache/flink/blob/106b83b88a1dab577b2f60b5a9dc8052526d3820/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java#L407)). ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java: ########## @@ -49,19 +49,48 @@ * of the operation) is made available via the "operationFuture" to the user. This operation is only * considered successfully if the "savepointFuture" completed successfully, and the job reached the * terminal state FINISHED. + * + * <p>This state has to cover several failure scenarios, depending on whether the savepoint + * succeeeds/fails and the job succeeds/fails/keeps running. + * + * <ul> + * <li>Savepoint succeeds, job succeeds - The happy path we like to see. + * <li>Savepoint fails, job fails - The generic failure case. Something happened during + * checkpointing on the TM side that also failed the task; fail the savepoint operation and + * restart the job. + * <li>Savepoint succeeds, job fails - Some issue occurred in notifyCheckpointComplete or during + * the job shutdown. Fail the savepoint operation and job, but inform the user about the + * created savepoint. + * <li>Savepoint fails, job keeps running - The savepoint failed due to an error on the JM side, + * before we ever triggered anything on the TM side. Fail the savepoint operation, but keep + * the job running. + * </ul> + * + * <p>This is further complicated by this information being transmitted via 2 separate RPCs from + * TM->JM, with the {@code savepointFuture} not being completed in the main thread, introducing + * ordering/lateness issues. Be careful to not liberally use {@link Context#runIfState(State, + * Runnable, Duration)} because it can result in a message being lost if multiple operations are + * queued and the first initiates a state transition. */ class StopWithSavepoint extends StateWithExecutionGraph { private final Context context; + /** + * The result future of this operation, containing the path to the savepoint. This is the future + * that other components (e.g., the REST API) wait for. + * + * <p>Must only be completed successfully if the savepoint was created and the job has FINISHED. + */ private final CompletableFuture<String> operationFuture; private final CheckpointScheduling checkpointScheduling; - private boolean hasFullyFinished = false; - - @Nullable private String savepoint = null; - @Nullable private Throwable operationFailureCause; + private boolean hasPendingStateTransition = false; + + // be careful when applying operations on this future that can trigger state transitions, + // as several other methods do the same and we mustn't trigger multiple transitions! + private final CompletableFuture<String> internalSavepointFuture = new CompletableFuture<>(); Review Comment: nit: I'm wondering whether we can make this future more descriptive by calling it something like `mainThreadBackedSavepointFuture` or `termainteInMainThreadSavepointFuture`... WDYT? 🤔 ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java: ########## @@ -336,17 +342,83 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception executionGraph.registerExecution(execution); TaskExecutionStateTransition taskExecutionStateTransition = ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception); - assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), is(true)); + assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue(); + } + } + + @Test + void testOnFailureWaitsForSavepointCompletion() throws Exception { + try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { + CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling(); + CompletableFuture<String> savepointFuture = new CompletableFuture<>(); + StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); + StopWithSavepoint sws = + createStopWithSavepoint( + ctx, mockStopWithSavepointOperations, executionGraph, savepointFuture); + ctx.setStopWithSavepoint(sws); + + ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO)); + + sws.onFailure(new Exception("task failure")); + // this is a sanity check that we haven't scheduled a state transition + ctx.triggerExecutors(); + + ctx.setExpectRestarting(assertNonNull()); + savepointFuture.complete(SAVEPOINT_PATH); + ctx.triggerExecutors(); + } + } + + @Test + void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception { Review Comment: Isn't that test testing the same as `testFinishedOnSuccessfulStopWithSavepoint` (first test in `StopWithSavepointTest`)? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java: ########## @@ -145,6 +145,16 @@ public void onLeave(Class<? extends State> newState) { } @Override Review Comment: nit: shouldn't we put the JavaDoc on top of the `@Override` annotation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org