Hi Bill!

For the WordCount case, these numbers are not unexpected. Flink does not
yet use a hash aggregator for the "reduce(v1, v2)" call, but uses a
sort-based aggregation for that. Flink's sort aggregations are very
reliable and very scalable compared to many hash aggregations, but often
more expensive. Especially on low-key-cardinality data sets, hash
aggregations outperform sort aggregations.

It is on the roadmap to add a managed-memory hash aggregator that is
reliable. For now, Flink's runtime has managed memory sorts and hash-joins,
so we stuck with the reliability over the performance.

It is cool to see that you are doing an evaluation and we are very curious
about your outcomes. Let us now please how it looks for other operations
and patterns, like joins, iterations, ...



Concerning performance tuning, here are a few pointers that may be
interesting:

  - You are using a lot of very small TaskManagers, each with one slot. It
will most likely be faster if you use fewer TaskManagers with more slots,
because then the network stack is shared between more tasks. This results
in fewer TCP connections, which each carry more data. You could try "-yn
$((111)) -ytm $((24*1024)) -yD taskmanager.numberOfTaskSlots=$((6))" for
example.

  - The example word-count implementation is not particularly tuned, I
think one can do better there.

  - Flink has a mode to reuse objects, which takes a bit of pressure from
the garbage collector. Where objects are not cached by the user code, this
may help reduce pressure that user code imposes on the GarbageCollector.


BTW: Are you including the YARN startup time, or are you measuring from
when the program execution starts?


Please pig us if you have more questions!


Greetings,
Stephan


On Fri, Jun 5, 2015 at 5:16 PM, Bill Sparks <jspa...@cray.com> wrote:

>  Hi.
>
>  I'm running some comparisons between flink, MRv2, and spark(1.3), using
> the new Intel HiBench suite. I've started with the stock workcount example
> and I'm seeing some numbers which are not where I thought I'd be.
>
>  So the question I have is what the the configuration parameters which
> can affect the performance? Is there a performance/tuning guide.
>
>  What we have – hardware wise are 48 Haswell/32 physical/64 HT cores with
> 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the following
> parameters.
>
>  ./bin/flink run -m yarn-cluster \
> -yD fs.overwrite-files=true \
> -yD fs.output.always-create-directory=true \
> -yq \
> -yn $((666)) \
> -yD taskmanager.numberOfTaskSlots=$((1)) \
> -yD parallelization.degree.default=$((666)) \
> -ytm $((4*1024)) \
> -yjm $((4*1024)) \
> ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \
> hdfs:///user/jsparks/HiBench/Wordcount/Input \
> hdfs:///user/jsparks/HiBench/Wordcount/Output
>
>  Any pointers would be greatly appreciated.
>
>  Type                Date       Time     Input_data_size      Duration(s)     
>      Throughput(bytes/s)  Throughput/node
> HadoopWordcount     2015-06-03 10:45:11 2052360935068        763.106          
>     2689483420           2689483420
> JavaSparkWordcount  2015-06-03 10:55:24 2052360935068        411.246          
>     4990591847           4990591847
> ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068        342.777          
>     5987452294           5987452294
>
> Type                Date       Time     Input_data_size      Duration(s)      
>     Throughput(bytes/s)  Throughput/node
> flinkWordCount      2015-06-04 16:27:27 2052360935068        647.383          
>     3170242244           66046713
>
>
>
>  --
>  Jonathan (Bill) Sparks
> Software Architecture
> Cray Inc.
>

Reply via email to