Hi Dawid, this is expected behavior. A partitioning will only be valid to the point that you change the parallelism.
In the modified program the data will be correctly partitioned (lets say into 8 partitions if the default parallelism is 8). After the partitioning, the 8 partitions have to be reduced to 3 partitions as defined by the map-partition operator with parallelism 3. This is done by randomly shuffling which destroys the range-partitioning. You have to set the parallelism of the partition operator to 3 as well to preserve the partitioning in the map-partition operator with parallelism 3. Cheers, Fabian 2016-03-29 20:47 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>: > Hi all, > > recently I am working on FLINK-2946 > <https://issues.apache.org/jira/browse/FLINK-2946> and I am supposed to > use > range partitioning, but I am not sure about the behaviour. I've adjusted a > little bit PartitionITCase#testRangePartitionerOnSequenceData so to set > custom parallelism after partitioning and it results in failing the test. > Is that a right behaviour or is that a bug? > Will be grateful for any comments. > > The adjusted code: > > @Test > public void testRangePartitionerOnSequenceData() throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSource<Long> dataSource = env.generateSequence(0, 10000); > KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector(); > > MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new > MinMaxSelector(); > > Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator(); > > final MapPartitionOperator<Long, Tuple2<Long, Long>> dataSourceToCollect = > dataSource > > .partitionByRange(keyExtractor).mapPartition(MinMaxSelector).setParallelism(3); > > List<Tuple2<Long, Long>> collected = dataSourceToCollect.collect(); > Collections.sort(collected, tuple2Comparator); > > long previousMax = -1; > for (Tuple2<Long, Long> tuple2 : collected) { > if (previousMax == -1) { > previousMax = tuple2.f1; > } else { > long currentMin = tuple2.f0; > assertTrue(tuple2.f0 < tuple2.f1); > assertEquals(previousMax + 1, currentMin); > previousMax = tuple2.f1; > } > } > } >