Andres - this is great feedback. Let me think about it a little bit more
and reply later.


On Thu, May 19, 2016 at 11:12 AM, Andres Perez <and...@tresata.com> wrote:

> Hi all,
>
> We were in the process of porting an RDD program to one which uses
> Datasets. Most things were easy to transition, but one hole in
> functionality we found was the ability to reduce a Dataset by key,
> something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding
> the functionality ourselves involved creating a KeyValueGroupedDataset and
> calling reduceGroups to get the reduced Dataset.
>
>   class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) {
>     def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2:
> Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] =
>       ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1,
> func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) }
>   }
>
> Note that the functions passed into .reduceGroups takes in the key-value
> pair. It'd be nicer to pass in a function that maps just the values, i.e.
> reduceGroups(func). This would require the ability to modify the values of
> the KeyValueGroupedDataset (which is returned by the .groupByKey call on a
> Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V
> => U)) does not currently exist.
>
> The more important issue, however, is the inefficiency of .reduceGroups.
> The function does not support partial aggregation (reducing map-side), and
> as a result requires shuffling all the data in the Dataset. A more
> efficient alternative that that we explored involved creating a Dataset
> from the KeyValueGroupedDataset by creating an Aggregator and passing it as
> a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the
> Aggregator necessitated the creation of a zero to create a valid monoid.
> However, the zero is dependent on the reduce function. The zero for a
> function such as addition on Ints would be different from the zero for
> taking the minimum over Ints, for example. The Aggregator requires that we
> not break the rule of reduce(a, zero) == a. To do this we had to create an
> Aggregator with a buffer type that stores the value along with a null flag
> (using Scala's nice Option syntax yielded some mysterious errors that I
> haven't worked through yet, unfortunately), used by the zero element to
> signal that it should not participate in the reduce function.
>
> -Andy
>

Reply via email to