[ https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245829#comment-15245829 ]
ASF GitHub Bot commented on FLINK-3665: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1848#discussion_r60075546 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java --- @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception { result.collect(); // should fail } + + + @Test + public void testRangePartitionerOnSequenceDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value) throws Exception { + return new Tuple2<>(value / 5000, value % 5000); + } + }); + + final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), + new LongComparator(false)); + + MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); + + final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1) + .withOrders(Order.ASCENDING, Order.DESCENDING) + .mapPartition(minMaxSelector) + .collect(); + + Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); + + Tuple2<Long, Long> previousMax = null; + for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { + if (previousMax == null) { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + previousMax = tuple2.f1; + } else { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + if (previousMax.f0.equals(tuple2.f0.f0)) { + assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue()); + } + previousMax = tuple2.f1; + } + } + } + + @Test + public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() { + @Override + public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception { + return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value); + } + }); + + final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), + new LongComparator(true)); + MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); + + final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0) + .withOrders(Order.ASCENDING) + .mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() { + @Override + public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values, + Collector<Tuple2<Long, Long>> out) throws Exception { + for (Tuple2<Tuple2<Long, Long>, Long> value : values) { + out.collect(value.f0); + } + } + }) + .mapPartition(minMaxSelector) + .collect(); + + Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); + + Tuple2<Long, Long> previousMax = null; + for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { + if (previousMax == null) { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + previousMax = tuple2.f1; + } else { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + if (previousMax.f0.equals(tuple2.f0.f0)) { + assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue()); + } + previousMax = tuple2.f1; + } + } + } + + @Test + public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() { + @Override + public Tuple2<ComparablePojo, Long> map(Long value) throws Exception { + return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value); + } + }); + + final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet + .partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() { + @Override + public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception { + return value.f0; + } + }) + .withOrders(Order.ASCENDING) + .mapPartition(new MinMaxSelector<>(new ComparablePojoComparator())) + .mapPartition(new ExtractComparablePojo()) + .collect(); + + final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator = + new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() { + @Override + public int compare(Tuple2<ComparablePojo, ComparablePojo> o1, + Tuple2<ComparablePojo, ComparablePojo> o2) { + return o1.f0.compareTo(o2.f1); + } + }; + Collections.sort(collected, pojoComparator); + + ComparablePojo previousMax = null; + for (Tuple2<ComparablePojo, ComparablePojo> element : collected) { + if (previousMax == null) { + assertTrue(element.f0.compareTo(element.f1) < 0); + previousMax = element.f1; + } else { + assertTrue(element.f0.compareTo(element.f1) < 0); + if (previousMax.first.equals(element.f0.first)) { + assertEquals(previousMax.second - 1, element.f0.second.longValue()); + } + previousMax = element.f1; + } + } + } + + private static class ExtractComparablePojo implements MapPartitionFunction< + Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>, + Tuple2<ComparablePojo, ComparablePojo>> { + + @Override + public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values, + Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception { + for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) { + out.collect(new Tuple2<>(value.f0.f0, value.f1.f0)); + } + } + } + + private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable { + + @Override + public int compare(Tuple2<ComparablePojo, Long> o1, + Tuple2<ComparablePojo, Long> o2) { + return o1.f0.compareTo(o2.f0); + } + } + + private static class ComparablePojo implements Comparable<ComparablePojo> { + private Long first; + private Long second; + + public Long getFirst() { + return first; + } + + public void setFirst(Long first) { + this.first = first; + } + + public Long getSecond() { + return second; + } + + public void setSecond(Long second) { + this.second = second; + } + + public ComparablePojo(Long first, + Long second) { + this.first = first; + this.second = second; + } + + public ComparablePojo() { + } + + @Override + public int compareTo(ComparablePojo o) { + final int firstResult = Long.compare(this.first, o.first); + if (firstResult == 0) { + return (-1) * Long.compare(this.second, o.second); + } + + return firstResult; + } + } + private static class ObjectSelfKeySelector implements KeySelector<Long, Long> { @Override public Long getKey(Long value) throws Exception { return value; } } - private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long, Long>> { + private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> { + + private final Comparator<T> comparator; + + public MinMaxSelector(Comparator<T> comparator) { + this.comparator = comparator; + } + @Override - public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long, Long>> out) throws Exception { - long max = Long.MIN_VALUE; - long min = Long.MAX_VALUE; - for (long value : values) { - if (value > max) { + public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception { + Iterator<T> itr = values.iterator(); + T min = itr.next(); + T max = min; + T value; + while (itr.hasNext()) { + value= itr.next(); + if (comparator.compare(value, min) < 0) { + min = value; + } + if (comparator.compare(value, max) > 0) { max = value; } - if (value < min) { - min = value; - } } - Tuple2<Long, Long> result = new Tuple2<>(min, max); + + Tuple2<T, T> result = new Tuple2<>(min, max); out.collect(result); } } - private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>> { + private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable { + + private final Comparator<T> firstComparator; + private final Comparator<T> secondComparator; + + public Tuple2Comparator(Comparator<T> comparator) { + this(comparator, comparator); + } + + public Tuple2Comparator(Comparator<T> firstComparator, + Comparator<T> secondComparator) { + this.firstComparator = firstComparator; + this.secondComparator = secondComparator; + } + @Override - public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second) { - long result = first.f0 - second.f0; + public int compare(Tuple2<T, T> first, Tuple2<T, T> second) { + long result = firstComparator.compare(first.f0, second.f0); if (result > 0) { return 1; } else if (result < 0) { return -1; } - result = first.f1 - second.f1; + result = secondComparator.compare(first.f1, second.f1); --- End diff -- I think we should only order on the min-element of the records (`first.f0` and `second.f0`). If `first.f0 == `second.f0`, the test must fail, because it would indicate that the same record ended up in two partitions, which is not allowed. > Range partitioning lacks support to define sort orders > ------------------------------------------------------ > > Key: FLINK-3665 > URL: https://issues.apache.org/jira/browse/FLINK-3665 > Project: Flink > Issue Type: Improvement > Components: DataSet API > Affects Versions: 1.0.0 > Reporter: Fabian Hueske > Fix For: 1.1.0 > > > {{DataSet.partitionByRange()}} does not allow to specify the sort order of > fields. This is fine if range partitioning is used to reduce skewed > partitioning. > However, it is not sufficient if range partitioning is used to sort a data > set in parallel. > Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily > changed, I propose to add a method {{withOrders(Order... orders)}} to > {{PartitionOperator}}. The method should throw an exception if the > partitioning method of {{PartitionOperator}} is not range partitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)