[ 
https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245824#comment-15245824
 ] 

ASF GitHub Bot commented on FLINK-3665:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60075060
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
 ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws 
Exception {
                result.collect(); // should fail
        }
     
    +
    +
    +   @Test
    +   public void testRangePartitionerOnSequenceDataWithOrders() throws 
Exception {
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 
10000)
    +                           .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
    +                   @Override
    +                   public Tuple2<Long, Long> map(Long value) throws 
Exception {
    +                           return new Tuple2<>(value / 5000, value % 5000);
    +                   }
    +           });
    +
    +           final Tuple2Comparator<Long> tuple2Comparator = new 
Tuple2Comparator<>(new LongComparator(true),
    +                                                                           
                                                                           new 
LongComparator(false));
    +
    +           MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new 
MinMaxSelector<>(tuple2Comparator);
    +
    +           final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> 
collected = dataSet.partitionByRange(0, 1)
    +                           .withOrders(Order.ASCENDING, Order.DESCENDING)
    +                           .mapPartition(minMaxSelector)
    +                           .collect();
    +
    +           Collections.sort(collected, new 
Tuple2Comparator<>(tuple2Comparator));
    +
    +           Tuple2<Long, Long> previousMax = null;
    +           for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : 
collected) {
    --- End diff --
    
    I am not sure the tests to check the correctness of the range partitioning 
are correct. The MinMaxSelector returns the smallest and largest records of 
each partition. We must ensure that the partitions are not overlapping. I 
propose to
    
    1. check for each minmax record that min <= max (just to be sure)
    2. sort all minmax records in `collected` by the min value (ignore the max 
value)
    3. check that the min value of each minmax record is larger than the max 
value of the previous minmax record.
    
    Does that make sense?


> Range partitioning lacks support to define sort orders
> ------------------------------------------------------
>
>                 Key: FLINK-3665
>                 URL: https://issues.apache.org/jira/browse/FLINK-3665
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Fabian Hueske
>             Fix For: 1.1.0
>
>
> {{DataSet.partitionByRange()}} does not allow to specify the sort order of 
> fields. This is fine if range partitioning is used to reduce skewed 
> partitioning. 
> However, it is not sufficient if range partitioning is used to sort a data 
> set in parallel. 
> Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily 
> changed, I propose to add a method {{withOrders(Order... orders)}} to 
> {{PartitionOperator}}. The method should throw an exception if the 
> partitioning method of {{PartitionOperator}} is not range partitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to