[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062032#comment-15062032 ]
ASF GitHub Bot commented on FLINK-7: ------------------------------------ Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1255#issuecomment-165453966 I executed the Flink WordCount example on 4 nodes with 8 parallel tasks and roughly 17GB of input data once with hash partitioning and once with range partitioning. Both times no combiner was used. First of all, both programs compute the same result and the result of the range partitioned WordCount is ordered. So the implementation is correct. The hash partitioned WC took 8:00 mins and the range partitioned 13:17 mins. The breakdown of the range partitioned WC looks as follows: 1. Source+FlatMap: 3:01 mins 2. LocalSample: 3:01 mins 3. GlobalSample: 0:15 mins 4. Histogram: 24 ms 5. PreparePartition: 8:49 mins 6. Partition: 8:48 mins 7. GroupReduce: 10:14 mins 8. Sink: 1:09 mins The breakdown of the hash partitioned WC is: 1. Source + FlatMap: 6:26 mins 2. Partition: 6:25 mins 3. GroupReduce: 7:58 mins 4. Sink: 1:21 mins So, the overhead of the range partitioned WC comes from additional IO of reading the flatMapped words and the additional 4-byte integer. Also the sorting of the group reduce does not happen concurrently with the source IO. Reducing (w/o sort) and sink take about the same amount of time. I also check the distribution of input and output records / bytes for the GroupReduce. | | min records-in | min bytes-in | max records-in | max bytes-in | | --- | --- | --- | --- | --- | | Hash | 197M | 1.82GB | 346M | 2.90GB | | Range | 177M | 1.41GB | 352M | 2.90GB | | | min records-out | min bytes-out | max records-out | max bytes-out | | --- | --- | --- | --- | --- | Hash | 2.5M | 26.5MB | 2.5M | 26.5MB Range | 2.3K | 28.2KB | 14M | 154MB We see that the range partitioner does not perform better (in fact a bit worse) than the hash partitioner (the differences for output records are expected). Maybe increasing the sample size helps? The overhead of reading the the intermediate data set from disk is so high, that a more fine-grained histogram can be justified, IMO. How about increasing the sample size from `parallelism * 20` to `parallelism * 1000`? Other thoughts? > [GitHub] Enable Range Partitioner > --------------------------------- > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: Sub-task > Components: Distributed Runtime > Reporter: GitHub Import > Assignee: Chengxiang Li > Fix For: pre-apache > > > The range partitioner is currently disabled. We need to implement the > following aspects: > 1) Distribution information, if available, must be propagated back together > with the ordering property. > 2) A generic bucket lookup structure (currently specific to PactRecord). > Tests to re-enable after fixing this issue: > - TeraSortITCase > - GlobalSortingITCase > - GlobalSortingMixedOrderITCase > ---------------- Imported from GitHub ---------------- > Url: https://github.com/stratosphere/stratosphere/issues/7 > Created by: [StephanEwen|https://github.com/StephanEwen] > Labels: core, enhancement, optimizer, > Milestone: Release 0.4 > Assignee: [fhueske|https://github.com/fhueske] > Created at: Fri Apr 26 13:48:24 CEST 2013 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)