Already tried that. The CPU hits 100% on the collectAsMap (even tried 
foreaching to a java ConcurrentHashmap), and eventually finishes, but while 
broadcasting, it takes a while, and at some point there's some timeout, and the 
worker is killed. The driver (and workers) have more than enough RAM (1.5GB of 
parquet expands to about 4.5GB, and the nodes have 64GB RAM). Filtering is also 
not an option, as every entry of the "smaller" dataset exists in the large one.
As mentioned in another reply, I managed to get it working by using embedded 
Redis on the driver, loading the smaller dataset into it, and then doing a 
straight map on the larger dataset via a foreachPartition, and doing lookups to 
the dirver's Redis. Since there's no network shuffle, the temp folder is barely 
touched, and it seems to work quite well.
-Ashic.

From: zouz...@gmail.com
Date: Wed, 10 Aug 2016 08:22:24 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi Ashic,
I think this approach should solve your problem, i.e., by broadcasting the 
small RDD. However you should do it propertly.
IMO, you should try
val smallRDDBroadcasted: Map[Int, YouTypeValue] = 
sc.broadcast(smallRDD.collectAsMap())
bigRDD.mapPartitoin{ case elems =>   // Here manually join using the map    
elems.flatMap{ case (key, value) =>      
smallRDDBroadcasted.value.get(key).map(x => (key, (value,x))    }}
Ensure that your driver has enough memory to store the above Map. If you get 
out of memory on the driver, increase your memory.
Speaking of which, a filtering step might also help on the above, i.e., filter 
the bigRDD with the keys of the Map before joining.
Hope this helps,Anastasios 

On Tue, Aug 9, 2016 at 4:46 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller....@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, ....)

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



                                          


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
                                          


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
                                          

                                          


-- 
-- Anastasios Zouzias

                                          

Reply via email to