Is there a way to do aggregateByKey on Datasets the way one can on an RDD? Consider the following RDD code to build a set of KeyVals into a DataFrame containing a column with the KeyVals' keys and a column containing lists of KeyVals. The end goal is to join it with collections which which will be similarly transformed.
case class KeyVal(k: Int, v: Int) val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j)) // function for appending to list val addToList = (s: List[KeyVal], v: KeyVal) => s :+ v // function for merging two lists val addLists = (s: List[KeyVal], t: List[KeyVal]) => s++t val keyAndKeyVals = keyVals.map(kv=> (kv.k, kv)) val keyAndNestedKeyVals = keyAndKeyVals. aggregateByKey(List[KeyVal]())(addToList, addLists). toDF("key", "keyvals") keyAndNestedKeyVals.show which produces: +---+--------------------+ |key| keyvals| +---+--------------------+ | 1|[[1,4], [1,5], [1...| | 2|[[2,4], [2,5], [2...| | 3|[[3,4], [3,5], [3...| +---+--------------------+ For a Dataset approach I tried the following to no avail: // Initialize as Dataset val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j)). toDS // Build key, keyVal mappings val keyValsByKey = keyVals.groupBy(kv=>(kv.k)) case class NestedKeyVal(key: Int, keyvals: List[KeyVal]) val convertToNested = (key: Int, keyValsIter: Iterator[KeyVal]) => NestedKeyVal(key=key, keyvals=keyValsIter.toList) val keyValsNestedByKey = keyValsByKey.mapGroups((key,keyvals) => convertToNested(key,keyvals)) keyValsNestedByKey.show This and several other incantations using groupBy + mapGroups consistently gives me serialization problems. Is this because the iterator can not be guaranteed across boundaries? Or is there some issue with what a Dataset can encode in the interim. What other ways might I approach this problem? Thanks, Lee