eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r284989584
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java ########## @@ -62,121 +49,73 @@ /** * Unit tests for {@link DefaultResultPartition}. */ -public class DefaultResultPartitionTest { +public class DefaultResultPartitionTest extends TestLogger { - private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + private final DefaultExecutionVertexTest.ExecutionStateProviderTest stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest(); - private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + private List<SchedulingExecutionVertex> schedulingExecutionVertices; - private ExecutionGraph executionGraph; - - private ExecutionGraphToSchedulingTopologyAdapter adapter; - - private List<IntermediateResultPartition> intermediateResultPartitions; - - private List<SchedulingResultPartition> schedulingResultPartitions; + private DefaultResultPartition resultPartition; @Before - public void setUp() throws Exception { - final int parallelism = 3; - JobVertex[] jobVertices = new JobVertex[2]; - jobVertices[0] = createNoOpVertex(parallelism); - jobVertices[1] = createNoOpVertex(parallelism); - jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING); - jobVertices[0].setInputDependencyConstraint(ALL); - jobVertices[1].setInputDependencyConstraint(ANY); - executionGraph = createSimpleTestGraph( - new JobID(), - taskManagerGateway, - triggeredRestartStrategy, - jobVertices); - adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph); - - intermediateResultPartitions = new ArrayList<>(); - schedulingResultPartitions = new ArrayList<>(); - - for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { - for (Map.Entry<IntermediateResultPartitionID, IntermediateResultPartition> entry - : vertex.getProducedPartitions().entrySet()) { - intermediateResultPartitions.add(entry.getValue()); - schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey()) - .orElseThrow(() -> new IllegalArgumentException("can not find partition" + entry.getKey()))); - } - } - assertEquals(parallelism, intermediateResultPartitions.size()); - } - - @Test - public void testBasicInfo() { - for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) { - final IntermediateResultPartition partition = intermediateResultPartitions.get(idx); - final SchedulingResultPartition schedulingResultPartition = schedulingResultPartitions.get(idx); - assertEquals(partition.getPartitionId(), schedulingResultPartition.getId()); - assertEquals(partition.getIntermediateResult().getId(), schedulingResultPartition.getResultId()); - assertEquals(partition.getResultType(), schedulingResultPartition.getPartitionType()); - } + public void setUp() { + schedulingExecutionVertices = new ArrayList<>(2); Review comment: OK ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services