Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.
I’d try it out, and hope that if you have a few superlarge keys bigger than the 
RAM available of one node, they spill to disk. Maybe play with persist() and 
using a different Storage Level.

> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> Hi Ben,
> 
> and that will take care of skewed data?
> 
> Gourav 
> 
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>> wrote:
> When you read both ‘a’ and ‘b', can you try repartitioning both by column 
> ‘id’? 
> If you .cache() and .count() to force a shuffle, it'll push the records that 
> will be joined to the same executors. 
> 
> So;
> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
> a.count()
> 
> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
> b.count()
> 
> And then join..
> 
> 
>> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com 
>> <mailto:as...@live.com>> wrote:
>> 
>> Hello,
>> We have two parquet inputs of the following form:
>> 
>> a: id:String, Name:String  (1.5TB)
>> b: id:String, Number:Int  (1.3GB)
>> 
>> We need to join these two to get (id, Number, Name). We've tried two 
>> approaches:
>> 
>> a.join(b, Seq("id"), "right_outer")
>> 
>> where a and b are dataframes. We also tried taking the rdds, mapping them to 
>> pair rdds with id as the key, and then joining. What we're seeing is that 
>> temp file usage is increasing on the join stage, and filling up our disks, 
>> causing the job to crash. Is there a way to join these two data sets without 
>> well...crashing?
>> 
>> Note, the ids are unique, and there's a one to one mapping between the two 
>> datasets. 
>> 
>> Any help would be appreciated.
>> 
>> -Ashic. 
> 
> 

Reply via email to