XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901097551


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -57,11 +79,12 @@ class StopWithSavepoint extends StateWithExecutionGraph {
 
     private final CheckpointScheduling checkpointScheduling;
 
-    private boolean hasFullyFinished = false;
-
-    @Nullable private String savepoint = null;
-
     @Nullable private Throwable operationFailureCause;
+    private boolean hasPendingStateTransition = false;
+
+    // be careful when applying operations on this future that can trigger 
state transitions,
+    // as several other methods do the same and we mustn't trigger multiple 
transitions!
+    private final CompletableFuture<String> internalSavepointFuture = new 
CompletableFuture<>();

Review Comment:
   May we add some comment to the `operationsFuture` as well to distinguish the 
two from each other? The purpose of the `operationFuture` is, as far as I 
understand, to have a future that completes dependening on the internal state 
as soon as the subsequent state transition is happening.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -337,6 +343,70 @@ void testRestartOnTaskFailureAfterSavepointCompletion() 
throws Exception {
         }
     }
 
+    @Test
+    void testOnFailureWaitsForSavepointCompletion() throws Exception {
+        try (MockStopWithSavepointContext ctx = new 
MockStopWithSavepointContext()) {
+            CheckpointScheduling mockStopWithSavepointOperations = new 
MockCheckpointScheduling();
+            CompletableFuture<String> savepointFuture = new 
CompletableFuture<>();
+            StateTrackingMockExecutionGraph executionGraph = new 
StateTrackingMockExecutionGraph();
+            StopWithSavepoint sws =
+                    createStopWithSavepoint(
+                            ctx, mockStopWithSavepointOperations, 
executionGraph, savepointFuture);
+            ctx.setStopWithSavepoint(sws);
+
+            ctx.setHowToHandleFailure(failure -> 
FailureResult.canRestart(failure, Duration.ZERO));
+
+            sws.onFailure(new Exception("task failure"));
+            ctx.triggerExecutors();

Review Comment:
   Should we add a comment here (and the other two test cases) indicating that 
we're not expecting anything to happen here because the `onFailure` call should 
have triggered a state change? ...to add a bit of context here. Because the 
test would succeed without this code line as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new 
MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, 
mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = 
CompletableFuture.completedFuture("");

Review Comment:
   I guess, we should extend the test name here to something like 
`testJobFailedWithSavepointCreationSuccessful` to distinguish the test in a 
better way from the other test cases.
   
   Additionally, we should verify that a `StopWithSavepointStopException` is 
part of the cause for this specific test case, shouldn't we?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
         try (MockStopWithSavepointContext ctx = new 
MockStopWithSavepointContext()) {
             StateTrackingMockExecutionGraph mockExecutionGraph =
                     new StateTrackingMockExecutionGraph();
-            StopWithSavepoint sws = createStopWithSavepoint(ctx, 
mockExecutionGraph);
+            final CompletableFuture<String> savepointFuture = 
CompletableFuture.completedFuture("");

Review Comment:
   > needs another explicit test for job fails + successful savepoint
   
   is that the test you're refering to in your [comment 
above](https://github.com/apache/flink/pull/19968#pullrequestreview-1010663103)?
   
   `testJobFailed` pretty much covers the scenario, doesn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to