Hmm, hashing will probably send all of the records with the same key to the same partition / machine. I’d try it out, and hope that if you have a few superlarge keys bigger than the RAM available of one node, they spill to disk. Maybe play with persist() and using a different Storage Level.
> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > > Hi Ben, > > and that will take care of skewed data? > > Gourav > > On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com > <mailto:bteeu...@gmail.com>> wrote: > When you read both ‘a’ and ‘b', can you try repartitioning both by column > ‘id’? > If you .cache() and .count() to force a shuffle, it'll push the records that > will be joined to the same executors. > > So; > a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache() > a.count() > > b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache() > b.count() > > And then join.. > > >> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com >> <mailto:as...@live.com>> wrote: >> >> Hello, >> We have two parquet inputs of the following form: >> >> a: id:String, Name:String (1.5TB) >> b: id:String, Number:Int (1.3GB) >> >> We need to join these two to get (id, Number, Name). We've tried two >> approaches: >> >> a.join(b, Seq("id"), "right_outer") >> >> where a and b are dataframes. We also tried taking the rdds, mapping them to >> pair rdds with id as the key, and then joining. What we're seeing is that >> temp file usage is increasing on the join stage, and filling up our disks, >> causing the job to crash. Is there a way to join these two data sets without >> well...crashing? >> >> Note, the ids are unique, and there's a one to one mapping between the two >> datasets. >> >> Any help would be appreciated. >> >> -Ashic. > >