Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1255#issuecomment-148716888 Hi @ChengXiangLi, I had a look at your PR and I think we need to change the implementation a bit. Right now, it executes an additional job for each range partition operator to obtain a sample. The additional job executes the full program and samples at the end. Imagine a complex program that includes for instance several joins and wants to write out the result in a total order, i.e., range partitions and sorts the result before it writes to the final sink. With the current implementation, this would mean that the expensive job is executed twice. It would be better to inject the sampling into the actual job. This can be done for example as follow. For a program such as: ``` DataSet x = ... x.rangePartition(0).reduce(...) ``` could be translated into: ``` DataSet<X> x = ... DataSet<Distr> dist = x.mapPartition("sample").reduce("collect samples and build distribution"); DataSet<Tuple2<Integer,X>> xWithPIDs = x .map("assign PartitionIDs).withBroadcastSet(dist, "distribution"); ``` This would inject the sampling into the original program. The sampling is done as before, but the data distribution is broadcasted to a map operator that uses the distribution to assign partition IDs to records and converts the `DataSet<X>` into a `DataSet<Tuple2<Integer, X>>` similar as the `KeySelector`. Once the partition IDs are assigned, a RangePartitionOperator could partition the tuples on the first field (f0) with a simple Int-DataDistribution (0,1,2,3,4,..., n). Finally, the DataSet needs to be unwrapped, i.e, converted from `DataSet<Tuple2<Integer,X>>` to `DataSet<X>`. I agree it is not super nice, but this implementationx would cache the intermediate result instead of recomputing it. In addition it barely touches the internals. It is also possible to integrate the partitioning more tightly into the runtime by providing the data distribution directly to the Partitioner. However, that would mean we need to implement a partitioning operator for the runtime (instead of using the regular operator and a NOOP driver). Btw. I have some code lying around (for a not-yet-completed features) to extract keys from a record given the key specification. Let me know if that would help for your implementation. Regarding the implementation of the `Partitioner` and `OutputEmitter`, I am very open for suggestions for how to improve the design. As you said, I would bring this discussion to the dev mailing list or open a JIRA and start a discussion there. What do you think? Thanks, Fabian
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---