Hi,
I'm trying to un-explode or denormalize a table like
+---+----+-----+------+--------+
|id |name|extra|data |priority|
+---+----+-----+------+--------+
|1 |Fred|8 |value1|1 |
|1 |Fred|8 |value8|2 |
|1 |Fred|8 |value5|3 |
|2 |Amy |9 |value3|1 |
|2 |Amy |9 |value5|2 |
+---+----+-----+------+--------+
into something that looks like
+---+----+------+------+---------+------+------+---------+------+------+---------+
|id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
|priority3|
+---+----+------+------+---------+------+------+---------+------+------+---------+
|1 |Fred|8 |value1|1 |8 |value8|2 |8 |value5|3
|
|2 |Amy |9 |value3|1 |9 |value5|2 |null |null
|null |
+---+----+------+------+---------+------+------+---------+------+------+---------+
If I were going the other direction, I'd create a new column with an array
of structs, each with 'extra', 'data', and 'priority' fields and then
explode it.
Going from the more normalized view, though, I'm having a harder time.
I want to group or partition by (id, name) and order by priority, but after
that I can't figure out how to get multiple rows rotated into one.
Any ideas?
Here's the code to create the input table above:
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
val rowsRDD = sc.parallelize(Seq(
Row(1, "Fred", 8, "value1", 1),
Row(1, "Fred", 8, "value8", 2),
Row(1, "Fred", 8, "value5", 3),
Row(2, "Amy", 9, "value3", 1),
Row(2, "Amy", 9, "value5", 2)))
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("extra", IntegerType, nullable = true),
StructField("data", StringType, nullable = true),
StructField("priority", IntegerType, nullable = true)))
val data = sqlContext.createDataFrame(rowsRDD, schema)