Andres - this is great feedback. Let me think about it a little bit more and reply later.
On Thu, May 19, 2016 at 11:12 AM, Andres Perez <and...@tresata.com> wrote: > Hi all, > > We were in the process of porting an RDD program to one which uses > Datasets. Most things were easy to transition, but one hole in > functionality we found was the ability to reduce a Dataset by key, > something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding > the functionality ourselves involved creating a KeyValueGroupedDataset and > calling reduceGroups to get the reduced Dataset. > > class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) { > def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2: > Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] = > ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1, > func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) } > } > > Note that the functions passed into .reduceGroups takes in the key-value > pair. It'd be nicer to pass in a function that maps just the values, i.e. > reduceGroups(func). This would require the ability to modify the values of > the KeyValueGroupedDataset (which is returned by the .groupByKey call on a > Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V > => U)) does not currently exist. > > The more important issue, however, is the inefficiency of .reduceGroups. > The function does not support partial aggregation (reducing map-side), and > as a result requires shuffling all the data in the Dataset. A more > efficient alternative that that we explored involved creating a Dataset > from the KeyValueGroupedDataset by creating an Aggregator and passing it as > a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the > Aggregator necessitated the creation of a zero to create a valid monoid. > However, the zero is dependent on the reduce function. The zero for a > function such as addition on Ints would be different from the zero for > taking the minimum over Ints, for example. The Aggregator requires that we > not break the rule of reduce(a, zero) == a. To do this we had to create an > Aggregator with a buffer type that stores the value along with a null flag > (using Scala's nice Option syntax yielded some mysterious errors that I > haven't worked through yet, unfortunately), used by the zero element to > signal that it should not participate in the reduce function. > > -Andy >