zhuzhurk commented on a change in pull request #11774: URL: https://github.com/apache/flink/pull/11774#discussion_r414269826
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ########## @@ -898,6 +825,116 @@ public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled( assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex); } + @Test + public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() { + final StreamGraph streamGraph = createStreamGraphForGlobalDataExchangeModeTests(); + assertThat(streamGraph.getGlobalDataExchangeMode(), is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED)); + } + + @Test + public void testAllEdgesBlockingMode() { + final StreamGraph streamGraph = createStreamGraphForGlobalDataExchangeModeTests(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.BLOCKING, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testAllEdgesPipelinedMode() { + final StreamGraph streamGraph = createStreamGraphForGlobalDataExchangeModeTests(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testForwardEdgesPipelinedMode() { + final StreamGraph streamGraph = createStreamGraphForGlobalDataExchangeModeTests(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testPointwiseEdgesPipelinedMode() { + final StreamGraph streamGraph = createStreamGraphForGlobalDataExchangeModeTests(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStream<Integer> source = env.fromElements(1, 2, 3).setParallelism(1); + final DataStream<Integer> forward = new DataStream<>(env, new PartitionTransformation<>( + source.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.PIPELINED)); + forward.map(i -> i).startNewChain().setParallelism(1); + final StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); + + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + } + + /** + * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1) + * --(rescale)--> map2(parallelism=2) --(rebalance)--> sink(parallelism=2). + */ + private StreamGraph createStreamGraphForGlobalDataExchangeModeTests() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream<Integer> source = env.fromElements(1, 2, 3).setParallelism(1); + + final DataStream<Integer> forward = new DataStream<>(env, new PartitionTransformation<>( + source.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED)); + final DataStream<Integer> map1 = forward.map(i -> i).startNewChain().setParallelism(1); + + final DataStream<Integer> rescale = new DataStream<>(env, new PartitionTransformation<>( + map1.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED)); + final DataStream<Integer> map2 = rescale.map(i -> i).setParallelism(2); Review comment: By specifying ShuffleMode.UNDEFINED these test do not need to have the assumption that `ShuffleMode.UNDEFINED` is the default value. So later if the default value is changed it will not break these tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org