On Wed, Feb 8, 2017 at 1:14 PM, ayan guha <guha.a...@gmail.com> wrote:
> Will a sql solution will be acceptable? > I'm very curious to see how it'd be done in raw SQL if you're up for it! I think the 2 programmatic solutions so far are viable, though, too. (By the way, thanks everyone for the great suggestions!) > > On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan <shawn...@gmail.com> wrote: > >> You could also try pivot. >> >> On 7 February 2017 at 16:13, Everett Anderson <ever...@nuna.com.invalid> >> wrote: >> >> >> >> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >> I think the fastest way is likely to use a combination of conditionals >> (when / otherwise), first (ignoring nulls), while grouping by the id. >> This should get the answer with only a single shuffle. >> >> Here is an example >> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html> >> . >> >> >> Very cool! Using the simpler aggregates feels cleaner. >> >> >> >> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >> Hi Everett, >> >> That's pretty much what I'd do. Can't think of a way to beat your >> solution. Why do you "feel vaguely uneasy about it"? >> >> >> Maybe it felt like I was unnecessarily grouping-by twice, but probably >> mostly that I hadn't used pivot before. >> >> Interestingly, the physical plans are not especially different between >> these two solutions after the rank column is added. They both have two >> SortAggregates that seem to be figuring out where to put results based on >> the rank: >> >> My original one: >> >> == Physical Plan == >> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS >> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490, >> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS >> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645] >> +- SortAggregate(key=[id#279,name#280], functions=[first(if >> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null, >> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else >> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312 >> else null, true)]) >> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if >> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null, >> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 >> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0)) >> temp_struct#312 else null, true)]) >> +- *Project [id#279, name#280, rank#292, struct(extra#281, >> data#282, priority#283) AS temp_struct#312] >> +- Window [denserank(priority#283) windowspecdefinition(id#279, >> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT >> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC] >> +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, >> 0 >> +- Exchange hashpartitioning(id#279, name#280, 200) >> +- Scan ExistingRDD[id#279,name#280, >> extra#281,data#282,priority#283] >> >> >> And modifying Michael's slightly to use a rank: >> >> import org.apache.spark.sql.functions._ >> >> def getColumnWithRank(column: String, rank: Int) = { >> first(when(col("rank") === lit(rank), col(column)).otherwise(null), >> ignoreNulls = true) >> } >> >> val withRankColumn = data.withColumn("rank", >> functions.dense_rank().over(Window.partitionBy("id", >> "name").orderBy("priority"))) >> >> val modCollapsed = withRankColumn >> .groupBy($"id", $"name") >> .agg( >> getColumnWithRank("data", 1) as 'data1, >> getColumnWithRank("data", 2) as 'data2, >> getColumnWithRank("data", 3) as 'data3, >> getColumnWithRank("extra", 1) as 'extra1, >> getColumnWithRank("extra", 2) as 'extra2, >> getColumnWithRank("extra", 3) as 'extra3) >> >> >> modCollapsed.explain >> >> == Physical Plan == >> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN >> (rank#965 = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 >> = 2) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN >> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 >> ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE >> null END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null >> END, true)]) >> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE >> WHEN (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE >> WHEN (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE >> WHEN (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE >> WHEN (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE >> WHEN (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE >> WHEN (rank#965 = 3) THEN extra#281 ELSE null END, true)]) >> +- *Project [id#279, name#280, extra#281, data#282, rank#965] >> +- Window [denserank(priority#283) windowspecdefinition(id#279, >> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT >> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC] >> +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0 >> +- Exchange hashpartitioning(id#279, name#280, 200) >> +- Scan ExistingRDD[id#279,name#280, >> extra#281,data#282,priority#283] >> >> >> >> >> I'd also check out the execution plan (with explain) to see how it's >> gonna work at runtime. I may have seen groupBy + join be better than >> window (there were more exchanges in play for windows I reckon). >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com> >> wrote: >> > >> > >> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> >> wrote: >> >> >> >> Hi, >> >> >> >> Could groupBy and withColumn or UDAF work perhaps? I think window could >> >> help here too. >> > >> > >> > This seems to work, but I do feel vaguely uneasy about it. :) >> > >> > // First add a 'rank' column which is priority order just in case >> priorities >> > aren't >> > // from 1 with no gaps. >> > val temp1 = data.withColumn("rank", functions.dense_rank() >> > .over(Window.partitionBy("id", "name").orderBy("priority"))) >> > >> > +---+----+-----+------+--------+----+ >> > | id|name|extra| data|priority|rank| >> > +---+----+-----+------+--------+----+ >> > | 1|Fred| 8|value1| 1| 1| >> > | 1|Fred| 8|value8| 2| 2| >> > | 1|Fred| 8|value5| 3| 3| >> > | 2| Amy| 9|value3| 1| 1| >> > | 2| Amy| 9|value5| 2| 2| >> > +---+----+-----+------+--------+----+ >> > >> > // Now move all the columns we want to denormalize into a struct column >> to >> > keep them together. >> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"), >> > temp1("data"), temp1("priority"))) >> > .drop("extra", "data", "priority") >> > >> > +---+----+----+------------+ >> > | id|name|rank| temp_struct| >> > +---+----+----+------------+ >> > | 1|Fred| 1|[8,value1,1]| >> > | 1|Fred| 2|[8,value8,2]| >> > | 1|Fred| 3|[8,value5,3]| >> > | 2| Amy| 1|[9,value3,1]| >> > | 2| Amy| 2|[9,value5,2]| >> > +---+----+----+------------+ >> > >> > // groupBy, again, but now pivot the rank column. We need an aggregate >> > function after pivot, >> > // so use first -- there will only ever be one element. >> > val temp3 = temp2.groupBy("id", "name") >> > .pivot("rank", Seq("1", "2", "3")) >> > .agg(functions.first("temp_struct")) >> > >> > +---+----+------------+------------+------------+ >> > | id|name| 1| 2| 3| >> > +---+----+------------+------------+------------+ >> > | 1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]| >> > | 2| Amy|[9,value3,1]|[9,value5,2]| null| >> > +---+----+------------+------------+------------+ >> > >> > // Now just moving things out of the structs and clean up. >> > val output = temp3.withColumn("extra1", temp3("1").getField("extra")) >> > .withColumn("data1", temp3("1").getField("data")) >> > .withColumn("priority1", temp3("1").getField("priority")) >> > .withColumn("extra2", temp3("2").getField("extra")) >> > .withColumn("data2", temp3("2").getField("data")) >> > .withColumn("priority2", temp3("2").getField("priority")) >> > .withColumn("extra3", temp3("3").getField("extra")) >> > .withColumn("data3", temp3("3").getField("data")) >> > .withColumn("priority3", temp3("3").getField("priority")) >> > .drop("1", "2", "3") >> > >> > +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3| >> > data3|priority3| >> > +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> > | 1|Fred| 8|value1| 1| 8|value8| 2| 8|value5| >> > 3| >> > | 2| Amy| 9|value3| 1| 9|value5| 2| null| null| >> > null| >> > +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> > >> > >> > >> > >> > >> > >> > >> >> >> >> >> >> Jacek >> >> >> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid> >> >> wrote: >> >>> >> >>> Hi, >> >>> >> >>> I'm trying to un-explode or denormalize a table like >> >>> >> >>> +---+----+-----+------+--------+ >> >>> |id |name|extra|data |priority| >> >>> +---+----+-----+------+--------+ >> >>> |1 |Fred|8 |value1|1 | >> >>> |1 |Fred|8 |value8|2 | >> >>> |1 |Fred|8 |value5|3 | >> >>> |2 |Amy |9 |value3|1 | >> >>> |2 |Amy |9 |value5|2 | >> >>> +---+----+-----+------+--------+ >> >>> >> >>> into something that looks like >> >>> >> >>> >> >>> +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3 >> >>> |priority3| >> >>> >> >>> +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> >>> |1 |Fred|8 |value1|1 |8 |value8|2 |8 >> |value5|3 >> >>> | >> >>> |2 |Amy |9 |value3|1 |9 |value5|2 |null |null >> >>> |null | >> >>> >> >>> +---+----+------+------+---------+------+------+---------+-- >> ----+------+---------+ >> >>> >> >>> If I were going the other direction, I'd create a new column with an >> >>> array of structs, each with 'extra', 'data', and 'priority' fields >> and then >> >>> explode it. >> >>> >> >>> Going from the more normalized view, though, I'm having a harder time. >> >>> >> >>> I want to group or partition by (id, name) and order by priority, but >> >>> after that I can't figure out how to get multiple rows rotated into >> one. >> >>> >> >>> Any ideas? >> >>> >> >>> Here's the code to create the input table above: >> >>> >> >>> import org.apache.spark.sql.Row >> >>> import org.apache.spark.sql.Dataset >> >>> import org.apache.spark.sql.types._ >> >>> >> >>> val rowsRDD = sc.parallelize(Seq( >> >>> Row(1, "Fred", 8, "value1", 1), >> >>> Row(1, "Fred", 8, "value8", 2), >> >>> Row(1, "Fred", 8, "value5", 3), >> >>> Row(2, "Amy", 9, "value3", 1), >> >>> Row(2, "Amy", 9, "value5", 2))) >> >>> >> >>> val schema = StructType(Seq( >> >>> StructField("id", IntegerType, nullable = true), >> >>> StructField("name", StringType, nullable = true), >> >>> StructField("extra", IntegerType, nullable = true), >> >>> StructField("data", StringType, nullable = true), >> >>> StructField("priority", IntegerType, nullable = true))) >> >>> >> >>> val data = sqlContext.createDataFrame(rowsRDD, schema) >> >>> >> >>> >> >>> >> > >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >> >> >> -- > Best Regards, > Ayan Guha >