Hi Michael,

As I understand broadcast joins, Jestin could also use broadcast
function on a dataset to make it broadcast. Jestin could force the
brodcast without the trick hoping it's gonna kick off brodcast.
Correct?

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Aug 14, 2016 at 9:51 AM, Michael Armbrust
<mich...@databricks.com> wrote:
> Have you tried doing the join in two parts (id == 0 and id != 0) and then
> doing a union of the results?  It is possible that with this technique, that
> the join which only contains skewed data would be filtered enough to allow
> broadcasting of one side.
>
> On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com>
> wrote:
>>
>> Hi, I'm currently trying to perform an outer join between two
>> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
>>
>> df1.id is skewed in that there are many 0's, the rest being unique IDs.
>>
>> df2.id is not skewed. If I filter df1.id != 0, then the join works well.
>> If I don't, then the join does not complete for a very, very long time.
>>
>> I have diagnosed this problem due to the hashpartitioning on IDs,
>> resulting in one partition containing many values due to data skew. One
>> executor ends up reading most of the shuffle data, and writing all of the
>> shuffle data, as shown below.
>>
>>
>>
>>
>>
>> Shown above is the task in question assigned to one executor.
>>
>>
>>
>> This screenshot comes from one of the executors, showing one single thread
>> spilling sort data since the executor cannot hold 90%+ of the ~200 GB result
>> in memory.
>>
>> Moreover, looking at the event timeline, I find that the executor on that
>> task spends about 20% time reading shuffle data, 70% computation, and 10%
>> writing output data.
>>
>> I have tried the following:
>>
>> "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
>> - This doesn't seem to have an effect since now I have hundreds/thousands
>> of keys with tens of thousands of occurrences.
>> - Should I increase N? Is there a way to just do random.mod(N) instead of
>> monotonically_increasing_id()?
>>
>> Repartitioning according to column I know contains unique values
>>
>> - This is overridden by Spark's sort-based shuffle manager which hash
>> repartitions on the skewed column
>>
>> - Is it possible to change this? Or will the join column need to be hashed
>> and partitioned on for joins to work
>>
>> Broadcasting does not work for my large tables
>>
>> Increasing/decreasing spark.sql.shuffle.partitions does not remedy the
>> skewed data problem as 0-product values are still being hashed to the same
>> partition.
>>
>>
>> ----------------------------------
>>
>> What I am considering currently is doing the join at the RDD level, but is
>> there any level of control which can solve my skewed data problem? Other
>> than that, see the bolded question.
>>
>> I would appreciate any suggestions/tips/experience with this. Thank you!
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

  • [no subject] Jestin Ma
    • Re: Mich Talebzadeh
      • Re: Jestin Ma
        • Re: Mich Talebzadeh
    • Re: Michael Armbrust
      • Re: Jacek Laskowski
        • Re: Michael Armbrust
          • Re: Jestin Ma

Reply via email to