Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/1848#discussion_r60391643 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java --- @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception { result.collect(); // should fail } + + + @Test + public void testRangePartitionerOnSequenceDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value) throws Exception { + return new Tuple2<>(value / 5000, value % 5000); + } + }); + + final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), + new LongComparator(false)); + + MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); + + final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1) + .withOrders(Order.ASCENDING, Order.DESCENDING) + .mapPartition(minMaxSelector) + .collect(); + + Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); + + Tuple2<Long, Long> previousMax = null; + for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { + if (previousMax == null) { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + previousMax = tuple2.f1; + } else { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + if (previousMax.f0.equals(tuple2.f0.f0)) { + assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue()); + } + previousMax = tuple2.f1; + } + } + } + + @Test + public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() { + @Override + public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception { + return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value); + } + }); + + final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), + new LongComparator(true)); + MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); + + final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0) + .withOrders(Order.ASCENDING) + .mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() { + @Override + public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values, + Collector<Tuple2<Long, Long>> out) throws Exception { + for (Tuple2<Tuple2<Long, Long>, Long> value : values) { + out.collect(value.f0); + } + } + }) + .mapPartition(minMaxSelector) + .collect(); + + Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); + + Tuple2<Long, Long> previousMax = null; + for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { + if (previousMax == null) { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + previousMax = tuple2.f1; + } else { + assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0); + if (previousMax.f0.equals(tuple2.f0.f0)) { + assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue()); + } + previousMax = tuple2.f1; + } + } + } + + @Test + public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000) + .map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() { + @Override + public Tuple2<ComparablePojo, Long> map(Long value) throws Exception { + return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value); + } + }); + + final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet + .partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() { + @Override + public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception { + return value.f0; + } + }) + .withOrders(Order.ASCENDING) + .mapPartition(new MinMaxSelector<>(new ComparablePojoComparator())) + .mapPartition(new ExtractComparablePojo()) + .collect(); + + final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator = + new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() { + @Override + public int compare(Tuple2<ComparablePojo, ComparablePojo> o1, + Tuple2<ComparablePojo, ComparablePojo> o2) { + return o1.f0.compareTo(o2.f1); + } + }; + Collections.sort(collected, pojoComparator); + + ComparablePojo previousMax = null; + for (Tuple2<ComparablePojo, ComparablePojo> element : collected) { + if (previousMax == null) { + assertTrue(element.f0.compareTo(element.f1) < 0); + previousMax = element.f1; + } else { + assertTrue(element.f0.compareTo(element.f1) < 0); + if (previousMax.first.equals(element.f0.first)) { + assertEquals(previousMax.second - 1, element.f0.second.longValue()); + } + previousMax = element.f1; + } + } + } + + private static class ExtractComparablePojo implements MapPartitionFunction< + Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>, + Tuple2<ComparablePojo, ComparablePojo>> { + + @Override + public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values, + Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception { + for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) { + out.collect(new Tuple2<>(value.f0.f0, value.f1.f0)); + } + } + } + + private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable { + + @Override + public int compare(Tuple2<ComparablePojo, Long> o1, + Tuple2<ComparablePojo, Long> o2) { + return o1.f0.compareTo(o2.f0); + } + } + + private static class ComparablePojo implements Comparable<ComparablePojo> { + private Long first; + private Long second; + + public Long getFirst() { + return first; + } + + public void setFirst(Long first) { + this.first = first; + } + + public Long getSecond() { + return second; + } + + public void setSecond(Long second) { + this.second = second; + } + + public ComparablePojo(Long first, + Long second) { + this.first = first; + this.second = second; + } + + public ComparablePojo() { + } + + @Override + public int compareTo(ComparablePojo o) { + final int firstResult = Long.compare(this.first, o.first); + if (firstResult == 0) { + return (-1) * Long.compare(this.second, o.second); + } + + return firstResult; + } + } + private static class ObjectSelfKeySelector implements KeySelector<Long, Long> { @Override public Long getKey(Long value) throws Exception { return value; } } - private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long, Long>> { + private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> { + + private final Comparator<T> comparator; + + public MinMaxSelector(Comparator<T> comparator) { + this.comparator = comparator; + } + @Override - public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long, Long>> out) throws Exception { - long max = Long.MIN_VALUE; - long min = Long.MAX_VALUE; - for (long value : values) { - if (value > max) { + public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception { + Iterator<T> itr = values.iterator(); + T min = itr.next(); + T max = min; + T value; + while (itr.hasNext()) { + value= itr.next(); + if (comparator.compare(value, min) < 0) { + min = value; + } + if (comparator.compare(value, max) > 0) { max = value; } - if (value < min) { - min = value; - } } - Tuple2<Long, Long> result = new Tuple2<>(min, max); + + Tuple2<T, T> result = new Tuple2<>(min, max); out.collect(result); } } - private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>> { + private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable { + + private final Comparator<T> firstComparator; + private final Comparator<T> secondComparator; + + public Tuple2Comparator(Comparator<T> comparator) { + this(comparator, comparator); + } + + public Tuple2Comparator(Comparator<T> firstComparator, + Comparator<T> secondComparator) { + this.firstComparator = firstComparator; + this.secondComparator = secondComparator; + } + @Override - public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second) { - long result = first.f0 - second.f0; + public int compare(Tuple2<T, T> first, Tuple2<T, T> second) { + long result = firstComparator.compare(first.f0, second.f0); if (result > 0) { return 1; } else if (result < 0) { return -1; } - result = first.f1 - second.f1; + result = secondComparator.compare(first.f1, second.f1); --- End diff -- This comparator is not used for comparing whole partitions of format ((min,max), (min, max)). It is rather used for comparing min with max when they're tuples. So I think the comparison on the second key is vital. Imagine for example partitioning on two fields a cartesian product of (1 to 2) x (1 to 50) where resutling partitions could be for example: 1. min: (1,50) max: (1, 17) 2. min (1,16) max: (2, 33) 3. min (2,32) max: (2, 1)
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---