XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901587633


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -377,6 +378,7 @@ void 
testOnGloballyTerminalStateWaitsForSavepointCompletion() throws Exception {
             ctx.setStopWithSavepoint(sws);
 
             sws.onGloballyTerminalState(JobStatus.FINISHED);
+            // this is a sanity check that we haven't scheduled a state 
transition

Review Comment:
   there's another location in 
`testConcurrentSavepointFailureAndGloballyTerminalStateCauseRestart` (see [line 
407](https://github.com/apache/flink/blob/106b83b88a1dab577b2f60b5a9dc8052526d3820/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java#L407)).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -49,19 +49,48 @@
  * of the operation) is made available via the "operationFuture" to the user. 
This operation is only
  * considered successfully if the "savepointFuture" completed successfully, 
and the job reached the
  * terminal state FINISHED.
+ *
+ * <p>This state has to cover several failure scenarios, depending on whether 
the savepoint
+ * succeeeds/fails and the job succeeds/fails/keeps running.
+ *
+ * <ul>
+ *   <li>Savepoint succeeds, job succeeds - The happy path we like to see.
+ *   <li>Savepoint fails, job fails - The generic failure case. Something 
happened during
+ *       checkpointing on the TM side that also failed the task; fail the 
savepoint operation and
+ *       restart the job.
+ *   <li>Savepoint succeeds, job fails - Some issue occurred in 
notifyCheckpointComplete or during
+ *       the job shutdown. Fail the savepoint operation and job, but inform 
the user about the
+ *       created savepoint.
+ *   <li>Savepoint fails, job keeps running - The savepoint failed due to an 
error on the JM side,
+ *       before we ever triggered anything on the TM side. Fail the savepoint 
operation, but keep
+ *       the job running.
+ * </ul>
+ *
+ * <p>This is further complicated by this information being transmitted via 2 
separate RPCs from
+ * TM->JM, with the {@code savepointFuture} not being completed in the main 
thread, introducing
+ * ordering/lateness issues. Be careful to not liberally use {@link 
Context#runIfState(State,
+ * Runnable, Duration)} because it can result in a message being lost if 
multiple operations are
+ * queued and the first initiates a state transition.
  */
 class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final Context context;
+    /**
+     * The result future of this operation, containing the path to the 
savepoint. This is the future
+     * that other components (e.g., the REST API) wait for.
+     *
+     * <p>Must only be completed successfully if the savepoint was created and 
the job has FINISHED.
+     */
     private final CompletableFuture<String> operationFuture;
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger 
state transitions,
+    // as several other methods do the same and we mustn't trigger multiple 
transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new 
CompletableFuture<>();

Review Comment:
   nit: I'm wondering whether we can make this future more descriptive by 
calling it something like `mainThreadBackedSavepointFuture` or 
`termainteInMainThreadSavepointFuture`... WDYT? 🤔 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -336,17 +342,83 @@ public void 
testRestartOnTaskFailureAfterSavepointCompletion() throws Exception
             executionGraph.registerExecution(execution);
             TaskExecutionStateTransition taskExecutionStateTransition =
                     
ExecutingTest.createFailingStateTransition(execution.getAttemptId(), exception);
-            
assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition), 
is(true));
+            
assertThat(sws.updateTaskExecutionState(taskExecutionStateTransition)).isTrue();
+        }
+    }
+
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new 
MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new 
MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new 
CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new 
StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, 
executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> 
FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            // this is a sanity check that we haven't scheduled a state 
transition
+            ctx.triggerExecutors();
+
+            ctx.setExpectRestarting(assertNonNull());
+            savepointFuture.complete(SAVEPOINT_PATH);
+            ctx.triggerExecutors();
+        }
+    }
+
+    @Test
+    void testOnGloballyTerminalStateWaitsForSavepointCompletion() throws 
Exception {

Review Comment:
   Isn't that test testing the same as 
`testFinishedOnSuccessfulStopWithSavepoint` (first test in 
`StopWithSavepointTest`)?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -145,6 +145,16 @@ public void onLeave(Class<? extends State> newState) {
     }
 
     @Override

Review Comment:
   nit: shouldn't we put the JavaDoc on top of the `@Override` annotation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to