Hi Maciek,
I followed your recommandation and benchmarked Dataframes aggregations on
Dataset. Here is what I got:
// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)
// Dataset with map and Dataframes sum
// 35.372s
df.as[A].map(_.fieldToSum).agg(sum("value")).collect().head.getAs[Long](0)
Not much of a difference. It seems that as soon as you access data as in RDDs,
you force the full decoding of the object into a case class, which is super
costly.
I find this behavior quite normal: as soon as you provide the user with the
ability to pass a blackbox function, anything can happen, so you have to load
the whole object. On the other hand, when using SQL-style functions only,
everything is "white box", so Spark understands what you want to do and can
optimize.
Still, it breaks the promise of Datasets to me, and I hope there is something
to do here (not confident on this point), and that it will be addressed in a
later release.
Best regards,
Julien
> Le 28 août 2016 à 22:12, Maciej Bryński <[email protected]> a écrit :
>
> Hi Julien,
> I thought about something like this:
> import org.apache.spark.sql.functions.sum
> df.as[A].map(_.fieldToSum).agg(sum("value")).collect()
> To try using Dataframes aggregation on Dataset instead of reduce.
>
> Regards,
> Maciek
>
> 2016-08-28 21:27 GMT+02:00 Julien Dumazert <[email protected]
> <mailto:[email protected]>>:
> Hi Maciek,
>
> I've tested several variants for summing "fieldToSum":
>
> First, RDD-style code:
> df.as[A].map(_.fieldToSum).reduce(_ + _)
> df.as[A].rdd.map(_.fieldToSum).sum()
> df.as[A].map(_.fieldToSum).rdd.sum()
> All around 30 seconds. "reduce" and "sum" seem to have the same performance,
> for this use case at least.
>
> Then with sql.functions.sum:
> df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)
> 0.24 seconds, super fast.
>
> Finally, dataset with column selection:
> df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)
> 0.18 seconds, super fast again.
>
> (I've also tried replacing my sums and reduces by counts on your advice, but
> the performance is unchanged. Apparently, summing does not take much time
> compared to accessing data.)
>
> It seems that we need to use the SQL interface to reach the highest level of
> performance, which somehow breaks the promise of Dataset (preserving type
> safety and having Catalyst and Tungsten performance like datasets).
>
> As for direct access to Row, it seems that it got much slower from 1.6 to
> 2.0. I guess, it's because of the fact that Dataframe is now Dataset[Row],
> and thus uses the same encoding/decoding mechanism as for any other case
> class.
>
> Best regards,
>
> Julien
>
>> Le 27 août 2016 à 22:32, Maciej Bryński <[email protected]
>> <mailto:[email protected]>> a écrit :
>>
>>
>> 2016-08-27 15:27 GMT+02:00 Julien Dumazert <[email protected]
>> <mailto:[email protected]>>:
>> df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
>>
>> I think reduce and sum has very different performance.
>> Did you try sql.functions.sum ?
>> Or of you want to benchmark access to Row object then count() function will
>> be better idea.
>>
>> Regards,
>> --
>> Maciek Bryński
>
>
>
>
> --
> Maciek Bryński