I worked up the following for grouping a DataFrame by a key and aggregating into arrays. It works, but I think it is horrible. Is there a better way? Especially one that does not require RDDs? This is a common pattern we need as we often want to explode JSON arrays, do something to enrich the data, then collapse it back into a structure similar to pre-exploded, but with the enriched data. collect_list seems to be the pattern I am looking for but it only works with Hive and only with primitives. Help?
thx. def groupToArray(df: DataFrame, groupByCols: Seq[String], arrayCol: String): DataFrame = { val sourceSchema = df.schema val arrayField = StructField(arrayCol, ArrayType(sourceSchema(arrayCol).dataType)) val groupByIndexes = groupByCols.map( colName => sourceSchema.fieldIndex(colName)) val arrayIndex = sourceSchema.fieldIndex(arrayCol) val destSchema = StructType( groupByCols.map( colName => sourceSchema(colName)) :+ arrayField ) val rowRdd = df .rdd .groupBy( r => groupByIndexes.map(r(_)) ) .map{ case (_, rowsIter) => val rowValues = rowsIter.head.toSeq val arr = rowsIter.map { r => r(arrayIndex) } val keys = groupByIndexes.map( ndx => rowValues(ndx)) Row.fromSeq(keys :+ arr) } df.sqlContext.createDataFrame(rowRdd, destSchema) }