Hi all, I am trying to compare spark and flink batch performance. In my test i am using ratings.csv in http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also concatenated ratings.csv 16 times to increase dataset size(total of 390465536 records almost 10gb).I am reading from google storage with gcs-connector and file schema is : userId,movieId,rating,timestamp. Basically i am calculating average rating per movie
Code for flink(i tested CombineHint.HASH and CombineHint.SORT) case class Rating(userID: String, movieID: String, rating: Double, date: > Timestamp) > > def parseRating(line: String): Rating = { > val arr = line.split(",") > Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * > 1000))) > } val ratings: DataSet[Rating] = > env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a > => parseRating(a)) > ratings > .map(i => (i.movieID, 1, i.rating)) > .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3), > CombineHint.HASH) > .map(i => (i._1, i._3 / > i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10) with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s Code for Spark(i tested reduceByKey and reduceByKeyLocaly) > case class Rating(userID: String, movieID: String, rating: Double, date: > Timestamp) > def parseRating(line: String): Rating = { > val arr = line.split(",") > Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * > 1000))) > } > val conf = new SparkConf().setAppName("Simple Application") > val sc = new SparkContext(conf) > val keyed: RDD[(String, (Int, Double))] = > sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r > => (r.movieID, (1, r.rating))) > keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i => > i._2 / > i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println) with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1 minute(almost 3m6s) Machine config on google cloud: taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory) jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory) java version:jdk jdk-8u102 flink:1.1.3 spark:2.0.2 I also attached flink-conf.yaml. Although it is not such a big difference there is a 40% performance difference between spark and flink. Is there something i am doing wrong? If there is not how can i fine tune flink or is it normal spark has better performance with batch data? Thank you in advance...
flink-conf.yaml
Description: application/yaml