Thank you Flavio. I will generate flamegraph for flink and compare them. On 18 November 2016 at 13:43, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> I think this could be very helpful for your study: > > http://db-blog.web.cern.ch/blog/luca-canali/2016-09-spark-20-performance- > improvements-investigated-flame-graphs > > Best, > Flavio > > On Fri, Nov 18, 2016 at 11:37 AM, CPC <acha...@gmail.com> wrote: > >> Hi Gabor, >> >> Thank you for your kind response. I forget to mention that i have >> actually three workers. This is why i set default paralelism to 6. >> >> For csv reading, i deliberately did not use csv reader since i want to >> run same code across spark and flink. Collect is returning 40k records >> which is not so big. >> >> I will try same test with spark 1.5 and 1.6 as well to understand whether >> spark 2.x series has some performance improvements because in those kind of >> tests, spark and flink was either on par or flink 10-15% faster than spark >> in the past. Aside from that are any configuration parameters you may >> propose to fine tune flink? >> >> Best, >> Anıl >> >> On Nov 18, 2016 12:25, "Gábor Gévay" <gga...@gmail.com> wrote: >> >>> Hello, >>> >>> Your program looks mostly fine, but there are a few minor things that >>> might help a bit: >>> >>> Parallelism: In your attached flink-conf.yaml, you have 2 task slots >>> per task manager, and if you have 1 task manager, then your total >>> number of task slots is also 2. However, your default parallelism is >>> 6. In Flink, the recommended default parallelism is exactly the total >>> number of task slots [1]. (This is in contrast to Spark, where the >>> recommended setting is 2-3 per CPU core [2].) >>> >>> CSV reading: If your input is a CSV file, then you should use >>> readCsvFile (instead of readTextFile and then parsing it manually). >>> >>> Collect call: How large is the DataSet that you are using collect on? >>> If it is large, then we might try to figure out a way to get the top >>> 10 elements without first collecting the DataSet. >>> >>> Best, >>> Gábor >>> >>> [1] https://flink.apache.org/faq.html#what-is-the-parallelism-ho >>> w-do-i-set-it >>> [2] https://spark.apache.org/docs/latest/tuning.html#level-of-pa >>> rallelism >>> >>> >>> >>> >>> >>> 2016-11-16 22:38 GMT+01:00 CPC <acha...@gmail.com>: >>> > Hi all, >>> > >>> > I am trying to compare spark and flink batch performance. In my test i >>> am >>> > using ratings.csv in >>> > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. >>> I also >>> > concatenated ratings.csv 16 times to increase dataset size(total of >>> > 390465536 records almost 10gb).I am reading from google storage with >>> > gcs-connector and file schema is : userId,movieId,rating,timestamp. >>> > Basically i am calculating average rating per movie >>> > >>> > Code for flink(i tested CombineHint.HASH and CombineHint.SORT) >>> >> >>> >> case class Rating(userID: String, movieID: String, rating: Double, >>> date: >>> >> Timestamp) >>> > >>> > >>> >> >>> >> def parseRating(line: String): Rating = { >>> >> val arr = line.split(",") >>> >> Rating(arr(0), arr(1), arr(2).toDouble, new >>> Timestamp((arr(3).toLong * >>> >> 1000))) >>> >> } >>> > >>> > >>> >> >>> >> val ratings: DataSet[Rating] = >>> >> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a >>> => >>> >> parseRating(a)) >>> >> ratings >>> >> .map(i => (i.movieID, 1, i.rating)) >>> >> .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3), >>> >> CombineHint.HASH) >>> >> .map(i => (i._1, i._3 / >>> >> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.r >>> everse).take(10) >>> > >>> > >>> > with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s >>> > >>> > Code for Spark(i tested reduceByKey and reduceByKeyLocaly) >>> >> >>> >> case class Rating(userID: String, movieID: String, rating: Double, >>> date: >>> >> Timestamp) >>> >> def parseRating(line: String): Rating = { >>> >> val arr = line.split(",") >>> >> Rating(arr(0), arr(1), arr(2).toDouble, new >>> Timestamp((arr(3).toLong * >>> >> 1000))) >>> >> } >>> >> val conf = new SparkConf().setAppName("Simple Application") >>> >> val sc = new SparkContext(conf) >>> >> val keyed: RDD[(String, (Int, Double))] = >>> >> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv >>> ").map(parseRating).map(r >>> >> => (r.movieID, (1, r.rating))) >>> >> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i => >>> >> i._2 / >>> >> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.r >>> everse).take(10).foreach(println) >>> > >>> > >>> > with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1 >>> > minute(almost 3m6s) >>> > >>> > Machine config on google cloud: >>> > taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory) >>> > jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory) >>> > java version:jdk jdk-8u102 >>> > flink:1.1.3 >>> > spark:2.0.2 >>> > >>> > I also attached flink-conf.yaml. Although it is not such a big >>> difference >>> > there is a 40% performance difference between spark and flink. Is there >>> > something i am doing wrong? If there is not how can i fine tune flink >>> or is >>> > it normal spark has better performance with batch data? >>> > >>> > Thank you in advance... >>> >> >