Hi all, recently I am working on FLINK-2946 <https://issues.apache.org/jira/browse/FLINK-2946> and I am supposed to use range partitioning, but I am not sure about the behaviour. I've adjusted a little bit PartitionITCase#testRangePartitionerOnSequenceData so to set custom parallelism after partitioning and it results in failing the test. Is that a right behaviour or is that a bug? Will be grateful for any comments.
The adjusted code: @Test public void testRangePartitionerOnSequenceData() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Long> dataSource = env.generateSequence(0, 10000); KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector(); MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new MinMaxSelector(); Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator(); final MapPartitionOperator<Long, Tuple2<Long, Long>> dataSourceToCollect = dataSource .partitionByRange(keyExtractor).mapPartition(MinMaxSelector).setParallelism(3); List<Tuple2<Long, Long>> collected = dataSourceToCollect.collect(); Collections.sort(collected, tuple2Comparator); long previousMax = -1; for (Tuple2<Long, Long> tuple2 : collected) { if (previousMax == -1) { previousMax = tuple2.f1; } else { long currentMin = tuple2.f0; assertTrue(tuple2.f0 < tuple2.f1); assertEquals(previousMax + 1, currentMin); previousMax = tuple2.f1; } } }