rkhachatryan commented on a change in pull request #15728: URL: https://github.com/apache/flink/pull/15728#discussion_r625298504
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java ########## @@ -149,18 +150,80 @@ public void testComputeWithMultipleLevels() throws Exception { } @Test - public void testWithTriggeredTasksNotRunning() throws Exception { + public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one RUNNING source and NOT RUNNING source. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoSourcesBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(twoSourcesBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one NOT RUNNING source and RUNNING not source task. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(sourceAndNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING not source task. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(twoNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks. + FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan. + runWithNotRunTask(sourceAndNotSourceBuilder); + + // then: The plan failed because one task didn't have RUNNING state. + } + + private void runWithNotRunTask( + FunctionWithException<JobVertexID, ExecutionGraph, Exception> graphBuilder) Review comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org