+1 to what Gavor said. The hash combine will be part of the upcoming
1.1. release, too.

This could be further amplified by the blocking intermediate results,
which have a very simplistic implementation writing out many different
files, which can lead to a lot of random I/O.

– Ufuk

On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <gga...@gmail.com> wrote:
> Hello Robert,
>
>> Is there something I might could do to optimize the grouping?
>
> You can try to make your `RichGroupReduceFunction` implement the
> `GroupCombineFunction` interface, so that Flink can do combining
> before the shuffle, which might significantly reduce the network load.
> (How much the combiner helps the performance can greatly depend on how
> large are your groups on average.)
>
> Alternatively, if you can reformulate your algorithm to use a `reduce`
> instead of a `reduceGroup` that might also improve the performance.
> Also, if you are using a `reduce`, then you can try calling
> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
> hint is a relatively new feature, so you need the current master for
> this.)
>
> Best,
> Gábor
>
>
>
> 2016-07-25 14:06 GMT+02:00 Paschek, Robert <robert.pasc...@tu-berlin.de>:
>> Hi Mailing List,
>>
>>
>>
>> i actually do some benchmarks with different algorithms. The System has 8
>> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
>> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
>> MapReduce, the execution mode is set to “BATCH_FORCED”
>>
>>
>>
>> It is suspicious, that three of the six algorithms had a big gap in runtime
>> (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms
>> in the “upper” group using a groupBy transformation and the algorithms in
>> the “lower” group don’t use groupBy.
>>
>> I attached the plot for better visualization.
>>
>>
>>
>> I also checked the logs, especially the time, when the mappers finishing and
>> the reducers start _iterating_ - they hardened my speculation.
>>
>>
>>
>> So my question is, if it is “normal”, that grouping is so cost-intensive
>> that – in my case – the runtime increases by 4 times?
>>
>> I have data from the same experiments running on a 13 nodes cluster with 26
>> cores with Apache Hadoop MapReduce, where the gap is still present, but
>> smaller (50s vs 57s or 55s vs 65s).
>>
>>
>>
>> Is there something I might could do to optimize the grouping? Some
>> codesnipplets:
>>
>>
>>
>> The Job:
>> DataSet<?> output = input
>>
>>                         .mapPartition(new
>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>>
>>                         .groupBy(0)
>>
>>                         .reduceGroup(new
>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>>
>>
>>
>> MR_GPMRS_Mapper():
>>
>> public class MR_GPMRS_Mapper <T extends Tuple> extends
>> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
>> BitSet, BitSet>>>
>>
>>
>>
>> MR_GPMRS_Reducer():
>>
>> public class MR_GPMRS_Reducer <T extends Tuple> extends
>> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
>> BitSet, BitSet>>, T>
>>
>>
>>
>> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
>> Integer Key for grouping.
>>
>>
>>
>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
>> - )
>>
>>
>>
>> Thank you in advance!
>>
>> Robert

Reply via email to