Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1255#issuecomment-165453966 I executed the Flink WordCount example on 4 nodes with 8 parallel tasks and roughly 17GB of input data once with hash partitioning and once with range partitioning. Both times no combiner was used. First of all, both programs compute the same result and the result of the range partitioned WordCount is ordered. So the implementation is correct. The hash partitioned WC took 8:00 mins and the range partitioned 13:17 mins. The breakdown of the range partitioned WC looks as follows: 1. Source+FlatMap: 3:01 mins 2. LocalSample: 3:01 mins 3. GlobalSample: 0:15 mins 4. Histogram: 24 ms 5. PreparePartition: 8:49 mins 6. Partition: 8:48 mins 7. GroupReduce: 10:14 mins 8. Sink: 1:09 mins The breakdown of the hash partitioned WC is: 1. Source + FlatMap: 6:26 mins 2. Partition: 6:25 mins 3. GroupReduce: 7:58 mins 4. Sink: 1:21 mins So, the overhead of the range partitioned WC comes from additional IO of reading the flatMapped words and the additional 4-byte integer. Also the sorting of the group reduce does not happen concurrently with the source IO. Reducing (w/o sort) and sink take about the same amount of time. I also check the distribution of input and output records / bytes for the GroupReduce. | | min records-in | min bytes-in | max records-in | max bytes-in | | --- | --- | --- | --- | --- | | Hash | 197M | 1.82GB | 346M | 2.90GB | | Range | 177M | 1.41GB | 352M | 2.90GB | | | min records-out | min bytes-out | max records-out | max bytes-out | | --- | --- | --- | --- | --- | Hash | 2.5M | 26.5MB | 2.5M | 26.5MB Range | 2.3K | 28.2KB | 14M | 154MB We see that the range partitioner does not perform better (in fact a bit worse) than the hash partitioner (the differences for output records are expected). Maybe increasing the sample size helps? The overhead of reading the the intermediate data set from disk is so high, that a more fine-grained histogram can be justified, IMO. How about increasing the sample size from `parallelism * 20` to `parallelism * 1000`? Other thoughts?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---