Can you give some outline as to what you mean? Should I broadcast a dataframe, 
and register the broadcasted df as a temp table? And then use a lookup UDF in a 
SELECT query?  
I've managed to get it working by loading the 1.5GB dataset into an embedded 
redis instance on the driver, and used a mapPartitions on the big dataframe to 
map it to the required triples by doing the lookup from redis. It took around 
13 minutes to load the data into redis using 4 cores, and the subsequent map on 
the main dataset was quite fast. 

From: gourav.sengu...@gmail.com
Date: Tue, 9 Aug 2016 21:13:51 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: mich.talebza...@gmail.com; samkiller....@gmail.com; deepakmc...@gmail.com; 
user@spark.apache.org

In case of skewed data the joins will mess things up. Try to write a UDF with 
the lookup on broadcast variable and then let me know the results. It should 
not take more than 40 mins in a 32 GB RAM system with 6 core processors.

Gourav
On Tue, Aug 9, 2016 at 6:02 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Mich,Hardware: AWS EMR cluster with 15 nodes with Rx3.2xlarge (CPU, RAM 
fine, disk a couple of hundred gig).
When we do:
onPointFiveTB.join(onePointFiveGig.cache(), "id")
we're seing that the temp directory is filling up fast, until a node gets 
killed. And then everything dies. 
-Ashic. 

From: mich.talebza...@gmail.com
Date: Tue, 9 Aug 2016 17:25:23 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: samkiller....@gmail.com; deepakmc...@gmail.com; user@spark.apache.org

Hi Sam,
What is your spark Hardware spec, No of nodes, RAM per node and disks please?
I don't understand this should not really be an issue. Underneath the bonnet it 
is a hash join. The small table I gather can be cached and the big table will 
do multiple passes using the temp space.
HTH

Dr Mich Talebzadeh


 


LinkedIn  
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


 


http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller....@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, ....)

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



                                          


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
                                          


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
                                          

                                          

                                          

                                          

Reply via email to