Hi Maciek,

I followed your recommandation and benchmarked Dataframes aggregations on 
Dataset. Here is what I got:

// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)
// Dataset with map and Dataframes sum
// 35.372s
df.as[A].map(_.fieldToSum).agg(sum("value")).collect().head.getAs[Long](0)

Not much of a difference. It seems that as soon as you access data as in RDDs, 
you force the full decoding of the object into a case class, which is super 
costly.

I find this behavior quite normal: as soon as you provide the user with the 
ability to pass a blackbox function, anything can happen, so you have to load 
the whole object. On the other hand, when using SQL-style functions only, 
everything is "white box", so Spark understands what you want to do and can 
optimize.

Still, it breaks the promise of Datasets to me, and I hope there is something 
to do here (not confident on this point), and that it will be addressed in a 
later release.

Best regards,
Julien


> Le 28 août 2016 à 22:12, Maciej Bryński <mac...@brynski.pl> a écrit :
> 
> Hi Julien,
> I thought about something like this:
> import org.apache.spark.sql.functions.sum
> df.as[A].map(_.fieldToSum).agg(sum("value")).collect()
> To try using Dataframes aggregation on Dataset instead of reduce.
> 
> Regards,
> Maciek
> 
> 2016-08-28 21:27 GMT+02:00 Julien Dumazert <julien.dumaz...@gmail.com 
> <mailto:julien.dumaz...@gmail.com>>:
> Hi Maciek,
> 
> I've tested several variants for summing "fieldToSum":
> 
> First, RDD-style code:
> df.as[A].map(_.fieldToSum).reduce(_ + _)
> df.as[A].rdd.map(_.fieldToSum).sum()
> df.as[A].map(_.fieldToSum).rdd.sum()
> All around 30 seconds. "reduce" and "sum" seem to have the same performance, 
> for this use case at least.
> 
> Then with sql.functions.sum:
> df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)
> 0.24 seconds, super fast.
> 
> Finally, dataset with column selection:
> df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)
> 0.18 seconds, super fast again.
> 
> (I've also tried replacing my sums and reduces by counts on your advice, but 
> the performance is unchanged. Apparently, summing does not take much time 
> compared to accessing data.)
> 
> It seems that we need to use the SQL interface to reach the highest level of 
> performance, which somehow breaks the promise of Dataset (preserving type 
> safety and having Catalyst and Tungsten performance like datasets).
> 
> As for direct access to Row, it seems that it got much slower from 1.6 to 
> 2.0. I guess, it's because of the fact that Dataframe is now Dataset[Row], 
> and thus uses the same encoding/decoding mechanism as for any other case 
> class.
> 
> Best regards,
> 
> Julien
> 
>> Le 27 août 2016 à 22:32, Maciej Bryński <mac...@brynski.pl 
>> <mailto:mac...@brynski.pl>> a écrit :
>> 
>> 
>> 2016-08-27 15:27 GMT+02:00 Julien Dumazert <julien.dumaz...@gmail.com 
>> <mailto:julien.dumaz...@gmail.com>>:
>> df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
>> 
>> I think reduce and sum has very different performance. 
>> Did you try sql.functions.sum ?
>> Or of you want to benchmark access to Row object then  count() function will 
>> be better idea.
>> 
>> Regards,
>> -- 
>> Maciek Bryński
> 
> 
> 
> 
> -- 
> Maciek Bryński

Reply via email to