Hi Maciek, I followed your recommandation and benchmarked Dataframes aggregations on Dataset. Here is what I got:
// Dataset with RDD-style code // 34.223s df.as[A].map(_.fieldToSum).reduce(_ + _) // Dataset with map and Dataframes sum // 35.372s df.as[A].map(_.fieldToSum).agg(sum("value")).collect().head.getAs[Long](0) Not much of a difference. It seems that as soon as you access data as in RDDs, you force the full decoding of the object into a case class, which is super costly. I find this behavior quite normal: as soon as you provide the user with the ability to pass a blackbox function, anything can happen, so you have to load the whole object. On the other hand, when using SQL-style functions only, everything is "white box", so Spark understands what you want to do and can optimize. Still, it breaks the promise of Datasets to me, and I hope there is something to do here (not confident on this point), and that it will be addressed in a later release. Best regards, Julien > Le 28 août 2016 à 22:12, Maciej Bryński <mac...@brynski.pl> a écrit : > > Hi Julien, > I thought about something like this: > import org.apache.spark.sql.functions.sum > df.as[A].map(_.fieldToSum).agg(sum("value")).collect() > To try using Dataframes aggregation on Dataset instead of reduce. > > Regards, > Maciek > > 2016-08-28 21:27 GMT+02:00 Julien Dumazert <julien.dumaz...@gmail.com > <mailto:julien.dumaz...@gmail.com>>: > Hi Maciek, > > I've tested several variants for summing "fieldToSum": > > First, RDD-style code: > df.as[A].map(_.fieldToSum).reduce(_ + _) > df.as[A].rdd.map(_.fieldToSum).sum() > df.as[A].map(_.fieldToSum).rdd.sum() > All around 30 seconds. "reduce" and "sum" seem to have the same performance, > for this use case at least. > > Then with sql.functions.sum: > df.agg(sum("fieldToSum")).collect().head.getAs[Long](0) > 0.24 seconds, super fast. > > Finally, dataset with column selection: > df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _) > 0.18 seconds, super fast again. > > (I've also tried replacing my sums and reduces by counts on your advice, but > the performance is unchanged. Apparently, summing does not take much time > compared to accessing data.) > > It seems that we need to use the SQL interface to reach the highest level of > performance, which somehow breaks the promise of Dataset (preserving type > safety and having Catalyst and Tungsten performance like datasets). > > As for direct access to Row, it seems that it got much slower from 1.6 to > 2.0. I guess, it's because of the fact that Dataframe is now Dataset[Row], > and thus uses the same encoding/decoding mechanism as for any other case > class. > > Best regards, > > Julien > >> Le 27 août 2016 à 22:32, Maciej Bryński <mac...@brynski.pl >> <mailto:mac...@brynski.pl>> a écrit : >> >> >> 2016-08-27 15:27 GMT+02:00 Julien Dumazert <julien.dumaz...@gmail.com >> <mailto:julien.dumaz...@gmail.com>>: >> df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _) >> >> I think reduce and sum has very different performance. >> Did you try sql.functions.sum ? >> Or of you want to benchmark access to Row object then count() function will >> be better idea. >> >> Regards, >> -- >> Maciek Bryński > > > > > -- > Maciek Bryński