Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1255#issuecomment-148716888
  
    Hi @ChengXiangLi, I had a look at your PR and I think we need to change the 
implementation a bit.
    Right now, it executes an additional job for each range partition operator 
to obtain a sample. The additional job executes the full program and samples at 
the end. Imagine a complex program that includes for instance several joins and 
wants to write out the result in a total order, i.e., range partitions and 
sorts the result before it writes to the final sink. With the current 
implementation, this would mean that the expensive job is executed twice.
    
    It would be better to inject the sampling into the actual job. This can be 
done for example as follow.
    For a program such as:
    ```
    DataSet x = ...
    x.rangePartition(0).reduce(...)
    ```
    could be translated into:
    ```
    DataSet<X> x = ...
    DataSet<Distr> dist = x.mapPartition("sample").reduce("collect samples and 
build distribution");
    DataSet<Tuple2<Integer,X>> xWithPIDs = x
      .map("assign PartitionIDs).withBroadcastSet(dist, "distribution");
    ```
    
    This would inject the sampling into the original program. The sampling is 
done as before, but the data distribution is broadcasted to a map operator that 
uses the distribution to assign partition IDs to records and converts the 
`DataSet<X>` into a `DataSet<Tuple2<Integer, X>>` similar as the `KeySelector`. 
Once the partition IDs are assigned, a RangePartitionOperator could partition 
the tuples on the first field (f0) with a simple Int-DataDistribution 
(0,1,2,3,4,..., n). Finally, the DataSet needs to be unwrapped, i.e, converted 
from `DataSet<Tuple2<Integer,X>>` to `DataSet<X>`. 
    
    I agree it is not super nice, but this implementationx would cache the 
intermediate result instead of recomputing it. In addition it barely touches 
the internals.
    
    It is also possible to integrate the partitioning more tightly into the 
runtime by providing the data distribution directly to the Partitioner. 
However, that would mean we need to implement a partitioning operator for the 
runtime (instead of using the regular operator and a NOOP driver).
    
    Btw. I have some code lying around (for a not-yet-completed features) to 
extract keys from a record given the key specification. Let me know if that 
would help for your implementation. 
    
    Regarding the implementation of the `Partitioner` and `OutputEmitter`, I am 
very open for suggestions for how to improve the design. As you said, I would 
bring this discussion to the dev mailing list or open a JIRA and start a 
discussion there.
    
    What do you think? Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to