[ https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249916#comment-15249916 ]
ASF GitHub Bot commented on FLINK-3665: --------------------------------------- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/1848#discussion_r60409121 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java --- @@ -546,43 +549,274 @@ public void testRangePartitionInIteration() throws Exception { result.collect(); // should fail } + + + @Test + public void testRangePartitionerOnSequenceDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value) throws Exception { + return new Tuple2<>(value / 5000, value % 5000); + } + }); + + final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), + new LongComparator(false)); + + MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); + + final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1) + .withOrders(Order.ASCENDING, Order.DESCENDING) + .mapPartition(minMaxSelector) + .collect(); + + Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); + + Tuple2<Long, Long> previousMax = null; + for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { + assertTrue("Min element in each partition should be smaller than max.", + tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0); + if (previousMax == null) { + previousMax = tuple2.f1; + } else { + assertTrue("Partitions overlap. Previous max should be smaller than current min.", + tuple2Comparator.compare(previousMax, tuple2.f0) <= 0); --- End diff -- Right, will fix it. > Range partitioning lacks support to define sort orders > ------------------------------------------------------ > > Key: FLINK-3665 > URL: https://issues.apache.org/jira/browse/FLINK-3665 > Project: Flink > Issue Type: Improvement > Components: DataSet API > Affects Versions: 1.0.0 > Reporter: Fabian Hueske > Fix For: 1.1.0 > > > {{DataSet.partitionByRange()}} does not allow to specify the sort order of > fields. This is fine if range partitioning is used to reduce skewed > partitioning. > However, it is not sufficient if range partitioning is used to sort a data > set in parallel. > Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily > changed, I propose to add a method {{withOrders(Order... orders)}} to > {{PartitionOperator}}. The method should throw an exception if the > partitioning method of {{PartitionOperator}} is not range partitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)