rkhachatryan commented on a change in pull request #15728:
URL: https://github.com/apache/flink/pull/15728#discussion_r625010241



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java
##########
@@ -149,18 +150,80 @@ public void testComputeWithMultipleLevels() throws 
Exception {
     }
 
     @Test
-    public void testWithTriggeredTasksNotRunning() throws Exception {
+    public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with one RUNNING source and NOT 
RUNNING source.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
twoSourcesBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id)
+                                .addJobVertex(new JobVertexID())
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(twoSourcesBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    @Test
+    public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with one NOT RUNNING source and 
RUNNING not source task.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
sourceAndNotSourceBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id)
+                                .addJobVertex(new JobVertexID(), false)
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(sourceAndNotSourceBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    @Test
+    public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with NOT RUNNING not source and 
RUNNING not source task.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
twoNotSourceBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id, false)
+                                .addJobVertex(new JobVertexID(), false)
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(twoNotSourceBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    @Test
+    public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with NOT RUNNING not source and 
RUNNING source tasks.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
sourceAndNotSourceBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id, false)
+                                .addJobVertex(new JobVertexID())
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))

Review comment:
       I think this line doesn't have any effect since later this vertex `id` 
transitions to some non-RUNNING state unconditionally.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java
##########
@@ -149,18 +150,80 @@ public void testComputeWithMultipleLevels() throws 
Exception {
     }
 
     @Test
-    public void testWithTriggeredTasksNotRunning() throws Exception {
+    public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with one RUNNING source and NOT 
RUNNING source.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
twoSourcesBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id)
+                                .addJobVertex(new JobVertexID())
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(twoSourcesBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    @Test
+    public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with one NOT RUNNING source and 
RUNNING not source task.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
sourceAndNotSourceBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id)
+                                .addJobVertex(new JobVertexID(), false)
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(sourceAndNotSourceBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    @Test
+    public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with NOT RUNNING not source and 
RUNNING not source task.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
twoNotSourceBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id, false)
+                                .addJobVertex(new JobVertexID(), false)
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(twoNotSourceBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    @Test
+    public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws 
Exception {
+        // given: Execution graph builder with NOT RUNNING not source and 
RUNNING source tasks.
+        FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
sourceAndNotSourceBuilder =
+                (id) ->
+                        new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                                .addJobVertex(id, false)
+                                .addJobVertex(new JobVertexID())
+                                .setTransitToRunning(vertex -> 
!vertex.getJobvertexId().equals(id))
+                                .build();
+
+        // when: Creating the checkpoint plan.
+        runWithNotRunTask(sourceAndNotSourceBuilder);
+
+        // then: The plan failed because one task didn't have RUNNING state.
+    }
+
+    private void runWithNotRunTask(
+            FunctionWithException<JobVertexID, ExecutionGraph, Exception> 
graphBuilder)

Review comment:
       The new tests structure seems more complex to me than it could be:
   - it's not clear what vertex states combination is being tested eventually
   - ...because transition happens in several places
   - ...and transition filter is applied to not all vertices
   - test names aren't very readable
   - we have `Predicate` field, `Function` argument and several functions
   
   WDYT about always creating two vertices in the same place and using a 
boolean flag `isSource`, so that the state combination and DAG are obvious?
   
   For example:
   
   ```
   for (boolean isSource : new boolean[]{true, false}) { // or pass as an 
argument
           for (ExecutionState nonRunningState : 
EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) {
               JobVertexID runningSource = new JobVertexID();
               JobVertexID notRunningVertex = new JobVertexID();
               ExecutionGraph graph = new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                   .addJobVertex(runningSource, true)
                   .addJobVertex(notRunningVertex, isSource)
                   .setTransitToRunning(false)
               // transition runningSource to RUNNING
               // transition notRunningVertex to nonRunningState
               // validate
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to