rkhachatryan commented on a change in pull request #15728: URL: https://github.com/apache/flink/pull/15728#discussion_r625010241
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java ########## @@ -149,18 +150,80 @@ public void testComputeWithMultipleLevels() throws Exception { } @Test - public void testWithTriggeredTasksNotRunning() throws Exception { + public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one RUNNING source and NOT RUNNING source. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoSourcesBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(twoSourcesBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one NOT RUNNING source and RUNNING not source task. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(sourceAndNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING not source task. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(twoNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) Review comment: I think this line doesn't have any effect since later this vertex `id` transitions to some non-RUNNING state unconditionally. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java ########## @@ -149,18 +150,80 @@ public void testComputeWithMultipleLevels() throws Exception { } @Test - public void testWithTriggeredTasksNotRunning() throws Exception { + public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one RUNNING source and NOT RUNNING source. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoSourcesBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(twoSourcesBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one NOT RUNNING source and RUNNING not source task. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(sourceAndNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING not source task. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(twoNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(sourceAndNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + private void runWithNotRunTask( + FunctionWithException<JobVertexID, ExecutionGraph, Exception> graphBuilder) Review comment: The new tests structure seems more complex to me than it could be: - it's not clear what vertex states combination is being tested eventually - ...because transition happens in several places - ...and transition filter is applied to not all vertices - test names aren't very readable - we have `Predicate` field, `Function` argument and several functions WDYT about always creating two vertices in the same place and using a boolean flag `isSource`, so that the state combination and DAG are obvious? For example: ``` for (boolean isSource : new boolean[]{true, false}) { // or pass as an argument for (ExecutionState nonRunningState : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) { JobVertexID runningSource = new JobVertexID(); JobVertexID notRunningVertex = new JobVertexID(); ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() .addJobVertex(runningSource, true) .addJobVertex(notRunningVertex, isSource) .setTransitToRunning(false) // transition runningSource to RUNNING // transition notRunningVertex to nonRunningState // validate ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org