[ https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245824#comment-15245824 ]
ASF GitHub Bot commented on FLINK-3665: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1848#discussion_r60075060 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java --- @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception { result.collect(); // should fail } + + + @Test + public void testRangePartitionerOnSequenceDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value) throws Exception { + return new Tuple2<>(value / 5000, value % 5000); + } + }); + + final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), + new LongComparator(false)); + + MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); + + final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1) + .withOrders(Order.ASCENDING, Order.DESCENDING) + .mapPartition(minMaxSelector) + .collect(); + + Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); + + Tuple2<Long, Long> previousMax = null; + for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { --- End diff -- I am not sure the tests to check the correctness of the range partitioning are correct. The MinMaxSelector returns the smallest and largest records of each partition. We must ensure that the partitions are not overlapping. I propose to 1. check for each minmax record that min <= max (just to be sure) 2. sort all minmax records in `collected` by the min value (ignore the max value) 3. check that the min value of each minmax record is larger than the max value of the previous minmax record. Does that make sense? > Range partitioning lacks support to define sort orders > ------------------------------------------------------ > > Key: FLINK-3665 > URL: https://issues.apache.org/jira/browse/FLINK-3665 > Project: Flink > Issue Type: Improvement > Components: DataSet API > Affects Versions: 1.0.0 > Reporter: Fabian Hueske > Fix For: 1.1.0 > > > {{DataSet.partitionByRange()}} does not allow to specify the sort order of > fields. This is fine if range partitioning is used to reduce skewed > partitioning. > However, it is not sufficient if range partitioning is used to sort a data > set in parallel. > Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily > changed, I propose to add a method {{withOrders(Order... orders)}} to > {{PartitionOperator}}. The method should throw an exception if the > partitioning method of {{PartitionOperator}} is not range partitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)