Hi Dawid,

this is expected behavior. A partitioning will only be valid to the point
that you change the parallelism.
In the modified program the data will be correctly partitioned (lets say
into 8 partitions if the default parallelism is 8).
After the partitioning, the 8 partitions have to be reduced to 3 partitions
as defined by the map-partition operator with parallelism 3. This is done
by randomly shuffling which destroys the range-partitioning.

You have to set the parallelism of the partition operator to 3 as well to
preserve the partitioning in the map-partition operator with parallelism 3.

Cheers, Fabian

2016-03-29 20:47 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:

> Hi all,
>
> recently I am working on FLINK-2946
> <https://issues.apache.org/jira/browse/FLINK-2946> and I am supposed to
> use
> range partitioning, but I am not sure about the behaviour. I've adjusted a
> little bit PartitionITCase#testRangePartitionerOnSequenceData so to set
> custom parallelism after partitioning and it results in failing the test.
> Is that a right behaviour or is that a bug?
> Will be grateful for any comments.
>
> The adjusted code:
>
> @Test
> public void testRangePartitionerOnSequenceData() throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSource<Long> dataSource = env.generateSequence(0, 10000);
> KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector();
>
> MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new
> MinMaxSelector();
>
> Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator();
>
> final MapPartitionOperator<Long, Tuple2<Long, Long>> dataSourceToCollect =
> dataSource
>
> .partitionByRange(keyExtractor).mapPartition(MinMaxSelector).setParallelism(3);
>
> List<Tuple2<Long, Long>> collected = dataSourceToCollect.collect();
> Collections.sort(collected, tuple2Comparator);
>
> long previousMax = -1;
> for (Tuple2<Long, Long> tuple2 : collected) {
> if (previousMax == -1) {
> previousMax = tuple2.f1;
> } else {
> long currentMin = tuple2.f0;
> assertTrue(tuple2.f0 < tuple2.f1);
> assertEquals(previousMax + 1, currentMin);
> previousMax = tuple2.f1;
> }
> }
> }
>

Reply via email to