Hi Michael, As I understand broadcast joins, Jestin could also use broadcast function on a dataset to make it broadcast. Jestin could force the brodcast without the trick hoping it's gonna kick off brodcast. Correct?
Pozdrawiam, Jacek Laskowski ---- https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sun, Aug 14, 2016 at 9:51 AM, Michael Armbrust <mich...@databricks.com> wrote: > Have you tried doing the join in two parts (id == 0 and id != 0) and then > doing a union of the results? It is possible that with this technique, that > the join which only contains skewed data would be filtered enough to allow > broadcasting of one side. > > On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com> > wrote: >> >> Hi, I'm currently trying to perform an outer join between two >> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id. >> >> df1.id is skewed in that there are many 0's, the rest being unique IDs. >> >> df2.id is not skewed. If I filter df1.id != 0, then the join works well. >> If I don't, then the join does not complete for a very, very long time. >> >> I have diagnosed this problem due to the hashpartitioning on IDs, >> resulting in one partition containing many values due to data skew. One >> executor ends up reading most of the shuffle data, and writing all of the >> shuffle data, as shown below. >> >> >> >> >> >> Shown above is the task in question assigned to one executor. >> >> >> >> This screenshot comes from one of the executors, showing one single thread >> spilling sort data since the executor cannot hold 90%+ of the ~200 GB result >> in memory. >> >> Moreover, looking at the event timeline, I find that the executor on that >> task spends about 20% time reading shuffle data, 70% computation, and 10% >> writing output data. >> >> I have tried the following: >> >> "Salting" the 0-value keys by monotonically_increasing_id().mod(N) >> - This doesn't seem to have an effect since now I have hundreds/thousands >> of keys with tens of thousands of occurrences. >> - Should I increase N? Is there a way to just do random.mod(N) instead of >> monotonically_increasing_id()? >> >> Repartitioning according to column I know contains unique values >> >> - This is overridden by Spark's sort-based shuffle manager which hash >> repartitions on the skewed column >> >> - Is it possible to change this? Or will the join column need to be hashed >> and partitioned on for joins to work >> >> Broadcasting does not work for my large tables >> >> Increasing/decreasing spark.sql.shuffle.partitions does not remedy the >> skewed data problem as 0-product values are still being hashed to the same >> partition. >> >> >> ---------------------------------- >> >> What I am considering currently is doing the join at the RDD level, but is >> there any level of control which can solve my skewed data problem? Other >> than that, see the bolded question. >> >> I would appreciate any suggestions/tips/experience with this. Thank you! >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org