This is interesting. See below notebook. it is  in 1.6.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5228339421202847/186877441366454/2805387300416006/latest.html

You create the 2 data-frame from partitioned parquet file.

Persist the files

and run the same query. It uses sort merge join.

Do not persist the 2 dataframes and it uses hash join :). I guess in memory
partition are not somehow able to get this info.

Thanks


On Wed, Apr 6, 2016 at 10:30 PM, Yong Zhang <java8...@hotmail.com> wrote:

> Got it.
>
> In the old MapReduce/Hive world, mapJoin means the broadcast join in
> Spark. So I thought you are looking for broadcast join in this case.
>
> What you describe is exactly the hash join. The most correct way is hash
> join, provided that your joining DFs already partitioned in the same way on
> the joined fileds.
>
> You shouldn't see any more shuffle if it works.
>
> Yong
>
> ------------------------------
> Date: Wed, 6 Apr 2016 22:11:38 +0100
> Subject: Re: Plan issue with spark 1.5.2
> From: darshan.m...@gmail.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
>
> Thanks for the information. When I mention map side join. I meant that
> each partition from 1 DF join with partition with same key of DF 2 on the
> worker node without shuffling the data.In other words do as much as work
> within worker node before shuffling the data.
>
> Thanks
> Darshan Singh
>
>
>
> On Wed, Apr 6, 2016 at 10:06 PM, Yong Zhang <java8...@hotmail.com> wrote:
>
> I think I gave you one misleading information.
>
> If you have 2 already partitioned (K, V) RDDs, and join them by K, then
> the correct plan you should see is HashJoin, instead of SortMerge.
>
> My guess is that when you see the SortMerge Join in DF, then Spark doesn't
> use the most efficient way of joining in this case.
>
> In the RDD level, this is already mature, but I don't know about Dataframe
> level, so someone else can give you more info how to archive that in the
> DataFrame level.
>
> I am not sure I understand the map side join question you have. If you
> have one DF very small, and the other one is much big, then you want to try
> map join. But you already partitioned both DFs, why you want to map-side
> join then?
>
> Yong
>
> ------------------------------
> Date: Wed, 6 Apr 2016 21:03:16 +0100
> Subject: Re: Plan issue with spark 1.5.2
> From: darshan.m...@gmail.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
>
> Thanks a lot for this. I was thinking of using cogrouped RDDs. We will try
> to move to 1.6 as there are other issues as well in 1.5.2.
>
> Same code is much faster in the 1.6.1.But plan wise I do not see much
> diff.Why it is still partitioning and then sorting and then joining?
>
> I expect it to sort within same partition based on title (another column
> for join). Then join the partitions and then results should be shuffled.
>
> Another question is how can we specify the same partitioner for saving say
> these 2 dataframes as parquet files or when read from a parquet files which
> were partitioned on movie column so that the map side join can happen. I do
> not want to repartition the data which I will read from parquet files using
> hashpartitioner on the movie column.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#409L])
> +- TungstenExchange SinglePartition, None
>    +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#442L])
>       +- Project
>          +- SortMergeJoin [movie#290,title#291], [movie#308,title#309]
>             :- Sort [movie#290 ASC,title#291 ASC], false, 0
>             :  +- TungstenExchange hashpartitioning(movie#290,title#291,200), 
> None
>             :     +- InMemoryColumnarTableScan [movie#290,title#291], 
> InMemoryRelation [movie#290,title#291,genres#292], true, 10000, 
> StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
>             +- Sort [movie#308 ASC,title#309 ASC], false, 0
>                +- TungstenExchange hashpartitioning(movie#308,title#309,200), 
> None
>                   +- InMemoryColumnarTableScan [movie#308,title#309], 
> InMemoryRelation [movie#308,title#309,genres#310], true, 10000, 
> StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
>
>
> Thanks
>
>
> On Wed, Apr 6, 2016 at 8:37 PM, Yong Zhang <java8...@hotmail.com> wrote:
>
> What you are looking for is
> https://issues.apache.org/jira/browse/SPARK-4849
>
> This feature is available in Spark 1.6.0, so the DataFrame can reuse the
> partitioned data in the join.
>
> For you case in 1.5.x, you have to use the RDD way to tell Spark that the
> join should utilize the presorted data.
>
> The way you did it, won't work no matter what, as Spark won't know
> that part_movies_1 and part_movies_2 are indeed partitioned by the same way.
>
> In the RDD level, you need to keep (K,V) RDD, and partitioned both side
> keys same ways, as you did below, then do the join in the RDD level, then
> Spark will know that it can reuse the pre-partitioned (K,V), and just merge
> join the 2 RDDs.
>
> If you lost the key (You built the DF only on the values part), then there
> is no way Spark will know that the values part indeed already partitioned.
>
> I don't think this can be done in the Dataframe level in 1.5.x. If this is
> wrong, please let me know.
>
> The execution plan is in fact doing SortMerge (which is correct in this
> case), but I think spark will sort both DFs again, even you already
> partitioned them.
>
> Yong
>
> ------------------------------
> Date: Wed, 6 Apr 2016 20:10:14 +0100
> Subject: Re: Plan issue with spark 1.5.2
> From: darshan.m...@gmail.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
>
> I used 1.5.2.I have used movies data to reproduce the issue. Below is the
> physical plan. I am not sure why it is hash partitioning the data and then
> sort and then join. I expect the data to be joined first and then send for
> further processing.
>
> I sort of expect a common partitioner which will work on say a column and
> will partition the dataframe on a given column in say given number of
> buckets and try to keep this data as close as possible physically as well
> i.e. colocated and if it sees the 2 tables with same partition columns then
> try to join them at partition level if the partition column is part of join
> condition and then shuffle data for further processing.
>
> Below are queries
>
> //read movies from parquet files.
> val movies_1 = sqlContext.sql("select * from movies")
> val movies_2 = sqlContext.sql("select * from movies")
>
> val part_movies_1 = sqlContext.createDataFrame(
>   movies_1.rdd.map(r => (r.getInt(0),
> r)).partitionBy(my_partitioner).values,
>   movies_1.schema
> )
> val part_movies_2 = sqlContext.createDataFrame(
>   movies_2.rdd.map(r => (r.getInt(0),
> r)).partitionBy(my_partitioner).values,
>   movies_2.schema
> )
>
> part_movies_1.persist()
> part_movies_2.persist()
> part_movies_1.registerTempTable("part_movies_1")
> part_movies_2.registerTempTable("part_movies_2")
> //look at storage in sparkUI
>
> val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join
> part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title")
>
> sql1.count() ///plan is for this count statement.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#750L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#783L])
>    TungstenProject
>     SortMergeJoin [movie#622,title#623], [movie#640,title#641]
>      TungstenSort [movie#622 ASC,title#623 ASC], false, 0
>       TungstenExchange hashpartitioning(movie#622,title#623)
>        ConvertToUnsafe
>         InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation 
> [movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, 
> false, true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None)
>      TungstenSort [movie#640 ASC,title#641 ASC], false, 0
>       TungstenExchange hashpartitioning(movie#640,title#641)
>        ConvertToUnsafe
>         InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation 
> [movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, 
> false, true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None)
>
> Please let me know if you need further information.
>
>
> On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote:
>
> You need to show us the execution plan, so we can understand what is your
> issue.
>
> Use the spark shell code to show how your DF is built, how you partition
> them, then use explain(true) on your join DF, and show the output here, so
> we can better help you.
>
> Yong
>
> > Date: Tue, 5 Apr 2016 09:46:59 -0700
> > From: darshan.m...@gmail.com
> > To: user@spark.apache.org
> > Subject: Plan issue with spark 1.5.2
> >
> >
> > I am using spark 1.5.2. I have a question regarding plan generated by
> spark.
> > I have 3 data-frames which has the data for different countries. I have
> > around 150 countries and data is skewed.
> >
> > My 95% queries will have country as criteria. However, I have seen issues
> > with the plans generated for queries which has country as join column.
> >
> > Data-frames are partitioned based on the country.Not only these
> dataframes
> > are co-partitioned, these are co-located as well. E.g. Data for UK in
> > data-frame df1, df2 df3 will be at on same hdfs datanode.
> >
> > Then when i join these 3 tables and country is one of the join column. I
> > assume that the join should be the map side join but it shuffles the data
> > from 3 dataframes and then join using shuffled data. Apart from country
> > there are other columns in join.
> >
> > Is this correct behavior? If it is an issue is it fixed in latest
> versions?
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
>
>

Reply via email to