Hi all,

recently I am working on FLINK-2946
<https://issues.apache.org/jira/browse/FLINK-2946> and I am supposed to use
range partitioning, but I am not sure about the behaviour. I've adjusted a
little bit PartitionITCase#testRangePartitionerOnSequenceData so to set
custom parallelism after partitioning and it results in failing the test.
Is that a right behaviour or is that a bug?
Will be grateful for any comments.

The adjusted code:

@Test
public void testRangePartitionerOnSequenceData() throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSource<Long> dataSource = env.generateSequence(0, 10000);
KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector();

MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new
MinMaxSelector();

Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator();

final MapPartitionOperator<Long, Tuple2<Long, Long>> dataSourceToCollect =
dataSource
.partitionByRange(keyExtractor).mapPartition(MinMaxSelector).setParallelism(3);

List<Tuple2<Long, Long>> collected = dataSourceToCollect.collect();
Collections.sort(collected, tuple2Comparator);

long previousMax = -1;
for (Tuple2<Long, Long> tuple2 : collected) {
if (previousMax == -1) {
previousMax = tuple2.f1;
} else {
long currentMin = tuple2.f0;
assertTrue(tuple2.f0 < tuple2.f1);
assertEquals(previousMax + 1, currentMin);
previousMax = tuple2.f1;
}
}
}

Reply via email to