
ASF GitHub Bot commented on FLINK-3665:

Github user fhueske commented on a diff in the pull request:

    --- Diff: 
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws 
Exception {
                result.collect(); // should fail
    +   @Test
    +   public void testRangePartitionerOnSequenceDataWithOrders() throws 
Exception {
    +           final ExecutionEnvironment env = 
    +           DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 
    +                           .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
    +                   @Override
    +                   public Tuple2<Long, Long> map(Long value) throws 
Exception {
    +                           return new Tuple2<>(value / 5000, value % 5000);
    +                   }
    +           });
    +           final Tuple2Comparator<Long> tuple2Comparator = new 
Tuple2Comparator<>(new LongComparator(true),
    +           MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new 
    +           final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> 
collected = dataSet.partitionByRange(0, 1)
    +                           .withOrders(Order.ASCENDING, Order.DESCENDING)
    +                           .mapPartition(minMaxSelector)
    +                           .collect();
    +           Collections.sort(collected, new 
    +           Tuple2<Long, Long> previousMax = null;
    +           for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : 
collected) {
    +                   if (previousMax == null) {
    +                           assertTrue(tuple2Comparator.compare(tuple2.f0, 
tuple2.f1) < 0);
    +                           previousMax = tuple2.f1;
    +                   } else {
    +                           assertTrue(tuple2Comparator.compare(tuple2.f0, 
tuple2.f1) < 0);
    +                           if (previousMax.f0.equals(tuple2.f0.f0)) {
    +                                   assertEquals(previousMax.f1 - 1, 
    +                           }
    +                           previousMax = tuple2.f1;
    +                   }
    +           }
    +   }
    +   @Test
    +   public void testRangePartitionerOnSequenceNestedDataWithOrders() throws 
Exception {
    +           final ExecutionEnvironment env = 
    +           final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = 
env.generateSequence(0, 10000)
    +                           .map(new MapFunction<Long, Tuple2<Tuple2<Long, 
Long>, Long>>() {
    +                                   @Override
    +                                   public Tuple2<Tuple2<Long, Long>, Long> 
map(Long value) throws Exception {
    +                                           return new Tuple2<>(new 
Tuple2<>(value / 5000, value % 5000), value);
    +                                   }
    +                           });
    +           final Tuple2Comparator<Long> tuple2Comparator = new 
Tuple2Comparator<>(new LongComparator(true),
    +                           new LongComparator(true));
    +           MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new 
    +           final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> 
collected = dataSet.partitionByRange(0)
    +                           .withOrders(Order.ASCENDING)
    +                           .mapPartition(new 
MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() {
    +                                   @Override
    +                                   public void 
mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
         Collector<Tuple2<Long, Long>> out) throws Exception {
    +                                           for (Tuple2<Tuple2<Long, Long>, 
Long> value : values) {
    +                                                   out.collect(value.f0);
    +                                           }
    +                                   }
    +                           })
    +                           .mapPartition(minMaxSelector)
    +                           .collect();
    +           Collections.sort(collected, new 
    +           Tuple2<Long, Long> previousMax = null;
    +           for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : 
collected) {
    +                   if (previousMax == null) {
    +                           assertTrue(tuple2Comparator.compare(tuple2.f0, 
tuple2.f1) < 0);
    +                           previousMax = tuple2.f1;
    +                   } else {
    +                           assertTrue(tuple2Comparator.compare(tuple2.f0, 
tuple2.f1) < 0);
    +                           if (previousMax.f0.equals(tuple2.f0.f0)) {
    +                                   assertEquals(previousMax.f1 + 1, 
    +                           }
    +                           previousMax = tuple2.f1;
    +                   }
    +           }
    +   }
    +   @Test
    +   public void 
testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws 
Exception {
    +           final ExecutionEnvironment env = 
    +           final DataSet<Tuple2<ComparablePojo, Long>> dataSet = 
env.generateSequence(0, 10000)
    +                           .map(new MapFunction<Long, 
Tuple2<ComparablePojo, Long>>() {
    +                                   @Override
    +                                   public Tuple2<ComparablePojo, Long> 
map(Long value) throws Exception {
    +                                           return new Tuple2<>(new 
ComparablePojo(value / 5000, value % 5000), value);
    +                                   }
    +                           });
    +           final List<Tuple2<ComparablePojo, ComparablePojo>> collected = 
    +                           .partitionByRange(new 
KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
    +                                   @Override
    +                                   public ComparablePojo 
getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
    +                                           return value.f0;
    +                                   }
    +                           })
    +                           .withOrders(Order.ASCENDING)
    +                           .mapPartition(new MinMaxSelector<>(new 
    +                           .mapPartition(new ExtractComparablePojo())
    +                           .collect();
    +           final Comparator<Tuple2<ComparablePojo, ComparablePojo>> 
pojoComparator =
    +                           new Comparator<Tuple2<ComparablePojo, 
ComparablePojo>>() {
    +                   @Override
    +                   public int compare(Tuple2<ComparablePojo, 
ComparablePojo> o1,
Tuple2<ComparablePojo, ComparablePojo> o2) {
    +                           return o1.f0.compareTo(o2.f1);
    +                   }
    +           };
    +           Collections.sort(collected, pojoComparator);
    +           ComparablePojo previousMax = null;
    +           for (Tuple2<ComparablePojo, ComparablePojo> element : 
collected) {
    +                   if (previousMax == null) {
    +                           assertTrue(element.f0.compareTo(element.f1) < 
    +                           previousMax = element.f1;
    +                   } else {
    +                           assertTrue(element.f0.compareTo(element.f1) < 
    +                           if (previousMax.first.equals(element.f0.first)) 
    +                                   assertEquals(previousMax.second - 1, 
    +                           }
    +                           previousMax = element.f1;
    +                   }
    +           }
    +   }
    +   private static class ExtractComparablePojo implements 
    +                   Tuple2<Tuple2<ComparablePojo, Long>, 
Tuple2<ComparablePojo, Long>>,
    +                   Tuple2<ComparablePojo, ComparablePojo>> {
    +           @Override
    +           public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, 
Long>, Tuple2<ComparablePojo, Long>>> values,
Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
    +                   for (Tuple2<Tuple2<ComparablePojo, Long>, 
Tuple2<ComparablePojo, Long>> value : values) {
    +                           out.collect(new Tuple2<>(value.f0.f0, 
    +                   }
    +           }
    +   }
    +    private static class ComparablePojoComparator implements 
Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
    +           @Override
    +           public int compare(Tuple2<ComparablePojo, Long> o1,
    +                                              Tuple2<ComparablePojo, Long> 
o2) {
    +                   return o1.f0.compareTo(o2.f0);
    +           }
    +   }
    +   private static class ComparablePojo implements 
Comparable<ComparablePojo> {
    +           private Long first;
    +           private Long second;
    +           public Long getFirst() {
    +                   return first;
    +           }
    +           public void setFirst(Long first) {
    +                   this.first = first;
    +           }
    +           public Long getSecond() {
    +                   return second;
    +           }
    +           public void setSecond(Long second) {
    +                   this.second = second;
    +           }
    +           public ComparablePojo(Long first,
    +                                                     Long second) {
    +                   this.first = first;
    +                   this.second = second;
    +           }
    +           public ComparablePojo() {
    +           }
    +           @Override
    +           public int compareTo(ComparablePojo o) {
    +                   final int firstResult = Long.compare(this.first, 
    +                   if (firstResult == 0) {
    +                           return (-1) * Long.compare(this.second, 
    +                   }
    +                   return firstResult;
    +           }
    +   }
        private static class ObjectSelfKeySelector implements KeySelector<Long, 
Long> {
                public Long getKey(Long value) throws Exception {
                        return value;
    -   private static class MinMaxSelector implements 
MapPartitionFunction<Long, Tuple2<Long, Long>> {
    +   private static class MinMaxSelector<T> implements 
MapPartitionFunction<T, Tuple2<T, T>> {
    +           private final Comparator<T> comparator;
    +           public MinMaxSelector(Comparator<T> comparator) {
    +                   this.comparator = comparator;
    +           }
    -           public void mapPartition(Iterable<Long> values, 
Collector<Tuple2<Long, Long>> out) throws Exception {
    -                   long max = Long.MIN_VALUE;
    -                   long min = Long.MAX_VALUE;
    -                   for (long value : values) {
    -                           if (value > max) {
    +           public void mapPartition(Iterable<T> values, 
Collector<Tuple2<T, T>> out) throws Exception {
    +                   Iterator<T> itr = values.iterator();
    +                   T min = itr.next();
    +                   T max = min;
    +                   T value;
    +                   while (itr.hasNext()) {
    +                           value= itr.next();
    +                           if (comparator.compare(value, min) < 0) {
    +                                   min = value;
    +                           }
    +                           if (comparator.compare(value, max) > 0) {
                                        max = value;
    -                           if (value < min) {
    -                                   min = value;
    -                           }
    -                   Tuple2<Long, Long> result = new Tuple2<>(min, max);
    +                   Tuple2<T, T> result = new Tuple2<>(min, max);
    -   private static class Tuple2Comparator implements 
Comparator<Tuple2<Long, Long>> {
    +   private static class Tuple2Comparator<T> implements 
Comparator<Tuple2<T, T>>, Serializable {
    +           private final Comparator<T> firstComparator;
    +           private final Comparator<T> secondComparator;
    +           public Tuple2Comparator(Comparator<T> comparator) {
    +                   this(comparator, comparator);
    +           }
    +           public Tuple2Comparator(Comparator<T> firstComparator,
    +                                                           Comparator<T> 
secondComparator) {
    +                   this.firstComparator = firstComparator;
    +                   this.secondComparator = secondComparator;
    +           }
    -           public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> 
second) {
    -                   long result = first.f0 - second.f0;
    +           public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
    +                   long result = firstComparator.compare(first.f0, 
                        if (result > 0) {
                                return 1;
                        } else if (result < 0) {
                                return -1;
    -                   result = first.f1 - second.f1;
    +                   result = secondComparator.compare(first.f1, second.f1);
    --- End diff --
    I think we should only order on the min-element of the records (`first.f0` 
and `second.f0`). If `first.f0 == `second.f0`, the test must fail, because it 
would indicate that the same record ended up in two partitions, which is not 

> Range partitioning lacks support to define sort orders
> ------------------------------------------------------
>                 Key: FLINK-3665
>                 URL: https://issues.apache.org/jira/browse/FLINK-3665
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Fabian Hueske
>             Fix For: 1.1.0
> {{DataSet.partitionByRange()}} does not allow to specify the sort order of 
> fields. This is fine if range partitioning is used to reduce skewed 
> partitioning. 
> However, it is not sufficient if range partitioning is used to sort a data 
> set in parallel. 
> Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily 
> changed, I propose to add a method {{withOrders(Order... orders)}} to 
> {{PartitionOperator}}. The method should throw an exception if the 
> partitioning method of {{PartitionOperator}} is not range partitioning.

This message was sent by Atlassian JIRA

Reply via email to