Is there a way to do aggregateByKey on Datasets the way one can on an RDD?

Consider the following RDD code to build a set of KeyVals into a DataFrame
containing a column with the KeyVals' keys and a column containing lists of
KeyVals.  The end goal is to join it with collections which which will be
similarly transformed.

case class KeyVal(k: Int, v: Int)


val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j))

// function for appending to list
val addToList = (s: List[KeyVal], v: KeyVal) => s :+ v

// function for merging two lists
val addLists = (s: List[KeyVal], t: List[KeyVal]) => s++t

val keyAndKeyVals = keyVals.map(kv=> (kv.k, kv))
val keyAndNestedKeyVals = keyAndKeyVals.
  aggregateByKey(List[KeyVal]())(addToList, addLists).
  toDF("key", "keyvals")
keyAndNestedKeyVals.show


which produces:

+---+--------------------+
|key|             keyvals|
+---+--------------------+
|  1|[[1,4], [1,5], [1...|
|  2|[[2,4], [2,5], [2...|
|  3|[[3,4], [3,5], [3...|
+---+--------------------+

For a Dataset approach I tried the following to no avail:

// Initialize as Dataset
val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j)).
  toDS

// Build key, keyVal mappings
val keyValsByKey = keyVals.groupBy(kv=>(kv.k))

case class NestedKeyVal(key: Int, keyvals: List[KeyVal])

val convertToNested = (key: Int, keyValsIter: Iterator[KeyVal]) =>
NestedKeyVal(key=key, keyvals=keyValsIter.toList)

val keyValsNestedByKey = keyValsByKey.mapGroups((key,keyvals) =>
convertToNested(key,keyvals))
keyValsNestedByKey.show


This and several other incantations using groupBy + mapGroups consistently
gives me serialization problems.  Is this because the iterator can not be
guaranteed across boundaries?
Or is there some issue with what a Dataset can encode in the interim.  What
other ways might I approach this problem?

Thanks,
Lee

Reply via email to