[ https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15230443#comment-15230443 ]
ASF GitHub Bot commented on FLINK-3665: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1848#discussion_r58895362 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java --- @@ -98,6 +101,14 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP this.customPartitioner = customPartitioner; this.distribution = distribution; } + + public PartitionOperator<T> withOrders(Order... orders) { --- End diff -- We should check that the number of `orders` is the same as the number of specified keys. Unfortunately, this is not as trivial as it sounds, because `Keys` does not give access to the the originally specified keys but only to the flattened logical keys. If a program specifies a `Tuple2<Long, Long>` as key, it will only specify a single order but the logical keys will be flattened to `[Long, Long]`. I think we should extend the `Keys` class by a method `TypeInformation<?>[] getOriginalKeyFieldTypes()` which returns the unflattened field types. Using that method we can see how many flat fields exist for each specified key field. > Range partitioning lacks support to define sort orders > ------------------------------------------------------ > > Key: FLINK-3665 > URL: https://issues.apache.org/jira/browse/FLINK-3665 > Project: Flink > Issue Type: Improvement > Components: DataSet API > Affects Versions: 1.0.0 > Reporter: Fabian Hueske > Fix For: 1.1.0 > > > {{DataSet.partitionByRange()}} does not allow to specify the sort order of > fields. This is fine if range partitioning is used to reduce skewed > partitioning. > However, it is not sufficient if range partitioning is used to sort a data > set in parallel. > Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily > changed, I propose to add a method {{withOrders(Order... orders)}} to > {{PartitionOperator}}. The method should throw an exception if the > partitioning method of {{PartitionOperator}} is not range partitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)