Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60075060
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
 ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws 
Exception {
                result.collect(); // should fail
        }
     
    +
    +
    +   @Test
    +   public void testRangePartitionerOnSequenceDataWithOrders() throws 
Exception {
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 
10000)
    +                           .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
    +                   @Override
    +                   public Tuple2<Long, Long> map(Long value) throws 
Exception {
    +                           return new Tuple2<>(value / 5000, value % 5000);
    +                   }
    +           });
    +
    +           final Tuple2Comparator<Long> tuple2Comparator = new 
Tuple2Comparator<>(new LongComparator(true),
    +                                                                           
                                                                           new 
LongComparator(false));
    +
    +           MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new 
MinMaxSelector<>(tuple2Comparator);
    +
    +           final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> 
collected = dataSet.partitionByRange(0, 1)
    +                           .withOrders(Order.ASCENDING, Order.DESCENDING)
    +                           .mapPartition(minMaxSelector)
    +                           .collect();
    +
    +           Collections.sort(collected, new 
Tuple2Comparator<>(tuple2Comparator));
    +
    +           Tuple2<Long, Long> previousMax = null;
    +           for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : 
collected) {
    --- End diff --
    
    I am not sure the tests to check the correctness of the range partitioning 
are correct. The MinMaxSelector returns the smallest and largest records of 
each partition. We must ensure that the partitions are not overlapping. I 
propose to
    
    1. check for each minmax record that min <= max (just to be sure)
    2. sort all minmax records in `collected` by the min value (ignore the max 
value)
    3. check that the min value of each minmax record is larger than the max 
value of the previous minmax record.
    
    Does that make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to