Hi, Flink is not using all available slots by default. You have to pass the "parallelism" as a parameter "-p 21" when submitting the job. This might also explain the performance difference compared to MapReduce.
The datatypes you are using look okay. I don't see a performance issue there. Regards, Robert On Tue, May 17, 2016 at 10:36 AM, Serhiy Boychenko <serhiy.boyche...@cern.ch > wrote: > Cheerz, > > > > Basically the data is stored in CSV format. The flatMap which I have > implemented does: > > String[] tokens = value.split(","); > > out.collect(new Tuple2<String, Double>(tokens[0], > Double.valueOf(tokens[2]))); > > > > The result calculation looks like: > > DataSet<Tuple2<String, String>> statistics = rawData.flatMap(new > VariableParser()).groupBy(0).reduceGroup(new ReduceStats()); > > > > ReduceStats implements GroupReduceFunction, iterates and addes values into > DescriptiveStatistics and at the end output min, max and avg. > > > > I ran the new experiments with suggested configuration and what I have > noticed is only one task slot is being occupied. Something I am doing is > wrong.. > > 3 > > Task Managers > > 21 > > Task Slots > > 20 > > Available Task Slots > > > > > > Best regards, > > Serhiy. > > > > *From:* Robert Metzger [mailto:rmetz...@apache.org] > *Sent:* 13 May 2016 15:26 > *To:* user@flink.apache.org > *Subject:* Re: Flink performance tuning > > > > Hi, > > > > Can you try running the job with 8 slots, 7 GB (maybe you need to go down > to 6 GB) and only three TaskManagers (-n 3) ? > > > > I'm suggesting this, because you have many small JVMs running on your > machines. On such small machines you can probably get much more use out of > your available memory by running a few big task managers (which can share > all the common management infra). > > Another plus of running a few JVMs is that you are deducing network > overhead, because communication can happen within the process, and less > network transfer is required. > > > > Another big factor for performance are the datatypes used. How do you > represent your data in Flink? (Are you using the TupleX types? or POJOs?) > > How do you select the key for the grouping? > > > > Regards, > > Robert > > > > > > On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko < > serhiy.boyche...@cern.ch> wrote: > > Hey, > > > > I have successfully integrated Flink into our very small test cluster (3 > machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am > started the session to use YARN as RM and the data is being read from HDFS. > > /yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024 > > > > My code is very simple, flatMap is being done on the CSV data, so I > extract the signal name and value, I group by signal name and performing > group reduce on the data in order to calculate max, min and average on the > collected values. > > > > I have observed on 3 nodes, the average processing rate is around > 11Mbytes/second. I have compared the results with MR execution(without any > kind of tuning) and I am quite surprised, since the performance of Hadoop > is 85Mybtes/second when executing the same query on the same data. I have > read few reports claiming that Flink is better in comparison to MR and > other tools. I am wondering what is wrong? Any clue? > > > > The processing rate is calculated according to the following formula: > > Overall processing rate = sum of total amount of data read per job/sum of > total time the job was running (including staging periods) > > > > Best regards, > > Serhiy. > > >