Hi,

Flink is not using all available slots by default. You have to pass the
"parallelism" as a parameter "-p 21" when submitting the job.
This might also explain the performance difference compared to MapReduce.

The datatypes you are using look okay. I don't see a performance issue
there.

Regards,
Robert


On Tue, May 17, 2016 at 10:36 AM, Serhiy Boychenko <serhiy.boyche...@cern.ch
> wrote:

> Cheerz,
>
>
>
> Basically the data is stored in CSV format. The flatMap which I have
> implemented does:
>
> String[] tokens = value.split(",");
>
> out.collect(new Tuple2<String, Double>(tokens[0],
> Double.valueOf(tokens[2])));
>
>
>
> The result calculation looks like:
>
> DataSet<Tuple2<String, String>> statistics = rawData.flatMap(new
> VariableParser()).groupBy(0).reduceGroup(new ReduceStats());
>
>
>
> ReduceStats implements GroupReduceFunction, iterates and addes values into
> DescriptiveStatistics and at the end output min, max and avg.
>
>
>
> I ran the new experiments with suggested configuration and what I have
> noticed is only one task slot is being occupied. Something I am doing is
> wrong..
>
> 3
>
> Task Managers
>
> 21
>
> Task Slots
>
> 20
>
> Available Task Slots
>
>
>
>
>
> Best regards,
>
> Serhiy.
>
>
>
> *From:* Robert Metzger [mailto:rmetz...@apache.org]
> *Sent:* 13 May 2016 15:26
> *To:* user@flink.apache.org
> *Subject:* Re: Flink performance tuning
>
>
>
> Hi,
>
>
>
> Can you try running the job with 8 slots, 7 GB (maybe you need to go down
> to 6 GB) and only three TaskManagers (-n 3) ?
>
>
>
> I'm suggesting this, because you have many small JVMs running on your
> machines. On such small machines you can probably get much more use out of
> your available memory by running a few big task managers (which can share
> all the common management infra).
>
> Another plus of running a few JVMs is that you are deducing network
> overhead, because communication can happen within the process, and less
> network transfer is required.
>
>
>
> Another big factor for performance are the datatypes used. How do you
> represent your data in Flink? (Are you using the TupleX types? or POJOs?)
>
> How do you select the key for the grouping?
>
>
>
> Regards,
>
> Robert
>
>
>
>
>
> On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko <
> serhiy.boyche...@cern.ch> wrote:
>
> Hey,
>
>
>
> I have successfully integrated Flink into our very small test cluster (3
> machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am
> started the session to use YARN as RM and the data is being read from HDFS.
>
> /yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024
>
>
>
> My code is very simple, flatMap is being done on the CSV data, so I
> extract the signal name and value, I group by signal name and performing
> group reduce on the data in order to calculate max, min and average on the
> collected values.
>
>
>
> I have observed on 3 nodes, the average processing rate is around
> 11Mbytes/second. I have compared the results with MR execution(without any
> kind of tuning) and I am quite surprised, since the performance of Hadoop
> is 85Mybtes/second when executing the same query on the same data. I have
> read few reports claiming that Flink is better in comparison to MR and
> other tools. I am wondering what is wrong? Any clue?
>
>
>
> The processing rate is calculated according to the following formula:
>
> Overall processing rate = sum of total amount of data read per job/sum of
> total time the job was running (including staging periods)
>
>
>
> Best regards,
>
> Serhiy.
>
>
>

Reply via email to