Thanks for the information. When I mention map side join. I meant that each
partition from 1 DF join with partition with same key of DF 2 on the worker
node without shuffling the data.In other words do as much as work within
worker node before shuffling the data.

Thanks
Darshan Singh



On Wed, Apr 6, 2016 at 10:06 PM, Yong Zhang <java8...@hotmail.com> wrote:

> I think I gave you one misleading information.
>
> If you have 2 already partitioned (K, V) RDDs, and join them by K, then
> the correct plan you should see is HashJoin, instead of SortMerge.
>
> My guess is that when you see the SortMerge Join in DF, then Spark doesn't
> use the most efficient way of joining in this case.
>
> In the RDD level, this is already mature, but I don't know about Dataframe
> level, so someone else can give you more info how to archive that in the
> DataFrame level.
>
> I am not sure I understand the map side join question you have. If you
> have one DF very small, and the other one is much big, then you want to try
> map join. But you already partitioned both DFs, why you want to map-side
> join then?
>
> Yong
>
> ------------------------------
> Date: Wed, 6 Apr 2016 21:03:16 +0100
> Subject: Re: Plan issue with spark 1.5.2
> From: darshan.m...@gmail.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
>
> Thanks a lot for this. I was thinking of using cogrouped RDDs. We will try
> to move to 1.6 as there are other issues as well in 1.5.2.
>
> Same code is much faster in the 1.6.1.But plan wise I do not see much
> diff.Why it is still partitioning and then sorting and then joining?
>
> I expect it to sort within same partition based on title (another column
> for join). Then join the partitions and then results should be shuffled.
>
> Another question is how can we specify the same partitioner for saving say
> these 2 dataframes as parquet files or when read from a parquet files which
> were partitioned on movie column so that the map side join can happen. I do
> not want to repartition the data which I will read from parquet files using
> hashpartitioner on the movie column.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#409L])
> +- TungstenExchange SinglePartition, None
>    +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#442L])
>       +- Project
>          +- SortMergeJoin [movie#290,title#291], [movie#308,title#309]
>             :- Sort [movie#290 ASC,title#291 ASC], false, 0
>             :  +- TungstenExchange hashpartitioning(movie#290,title#291,200), 
> None
>             :     +- InMemoryColumnarTableScan [movie#290,title#291], 
> InMemoryRelation [movie#290,title#291,genres#292], true, 10000, 
> StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
>             +- Sort [movie#308 ASC,title#309 ASC], false, 0
>                +- TungstenExchange hashpartitioning(movie#308,title#309,200), 
> None
>                   +- InMemoryColumnarTableScan [movie#308,title#309], 
> InMemoryRelation [movie#308,title#309,genres#310], true, 10000, 
> StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
>
>
> Thanks
>
>
> On Wed, Apr 6, 2016 at 8:37 PM, Yong Zhang <java8...@hotmail.com> wrote:
>
> What you are looking for is
> https://issues.apache.org/jira/browse/SPARK-4849
>
> This feature is available in Spark 1.6.0, so the DataFrame can reuse the
> partitioned data in the join.
>
> For you case in 1.5.x, you have to use the RDD way to tell Spark that the
> join should utilize the presorted data.
>
> The way you did it, won't work no matter what, as Spark won't know
> that part_movies_1 and part_movies_2 are indeed partitioned by the same way.
>
> In the RDD level, you need to keep (K,V) RDD, and partitioned both side
> keys same ways, as you did below, then do the join in the RDD level, then
> Spark will know that it can reuse the pre-partitioned (K,V), and just merge
> join the 2 RDDs.
>
> If you lost the key (You built the DF only on the values part), then there
> is no way Spark will know that the values part indeed already partitioned.
>
> I don't think this can be done in the Dataframe level in 1.5.x. If this is
> wrong, please let me know.
>
> The execution plan is in fact doing SortMerge (which is correct in this
> case), but I think spark will sort both DFs again, even you already
> partitioned them.
>
> Yong
>
> ------------------------------
> Date: Wed, 6 Apr 2016 20:10:14 +0100
> Subject: Re: Plan issue with spark 1.5.2
> From: darshan.m...@gmail.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
>
> I used 1.5.2.I have used movies data to reproduce the issue. Below is the
> physical plan. I am not sure why it is hash partitioning the data and then
> sort and then join. I expect the data to be joined first and then send for
> further processing.
>
> I sort of expect a common partitioner which will work on say a column and
> will partition the dataframe on a given column in say given number of
> buckets and try to keep this data as close as possible physically as well
> i.e. colocated and if it sees the 2 tables with same partition columns then
> try to join them at partition level if the partition column is part of join
> condition and then shuffle data for further processing.
>
> Below are queries
>
> //read movies from parquet files.
> val movies_1 = sqlContext.sql("select * from movies")
> val movies_2 = sqlContext.sql("select * from movies")
>
> val part_movies_1 = sqlContext.createDataFrame(
>   movies_1.rdd.map(r => (r.getInt(0),
> r)).partitionBy(my_partitioner).values,
>   movies_1.schema
> )
> val part_movies_2 = sqlContext.createDataFrame(
>   movies_2.rdd.map(r => (r.getInt(0),
> r)).partitionBy(my_partitioner).values,
>   movies_2.schema
> )
>
> part_movies_1.persist()
> part_movies_2.persist()
> part_movies_1.registerTempTable("part_movies_1")
> part_movies_2.registerTempTable("part_movies_2")
> //look at storage in sparkUI
>
> val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join
> part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title")
>
> sql1.count() ///plan is for this count statement.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#750L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#783L])
>    TungstenProject
>     SortMergeJoin [movie#622,title#623], [movie#640,title#641]
>      TungstenSort [movie#622 ASC,title#623 ASC], false, 0
>       TungstenExchange hashpartitioning(movie#622,title#623)
>        ConvertToUnsafe
>         InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation 
> [movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, 
> false, true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None)
>      TungstenSort [movie#640 ASC,title#641 ASC], false, 0
>       TungstenExchange hashpartitioning(movie#640,title#641)
>        ConvertToUnsafe
>         InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation 
> [movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, 
> false, true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None)
>
> Please let me know if you need further information.
>
>
> On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote:
>
> You need to show us the execution plan, so we can understand what is your
> issue.
>
> Use the spark shell code to show how your DF is built, how you partition
> them, then use explain(true) on your join DF, and show the output here, so
> we can better help you.
>
> Yong
>
> > Date: Tue, 5 Apr 2016 09:46:59 -0700
> > From: darshan.m...@gmail.com
> > To: user@spark.apache.org
> > Subject: Plan issue with spark 1.5.2
> >
> >
> > I am using spark 1.5.2. I have a question regarding plan generated by
> spark.
> > I have 3 data-frames which has the data for different countries. I have
> > around 150 countries and data is skewed.
> >
> > My 95% queries will have country as criteria. However, I have seen issues
> > with the plans generated for queries which has country as join column.
> >
> > Data-frames are partitioned based on the country.Not only these
> dataframes
> > are co-partitioned, these are co-located as well. E.g. Data for UK in
> > data-frame df1, df2 df3 will be at on same hdfs datanode.
> >
> > Then when i join these 3 tables and country is one of the join column. I
> > assume that the join should be the map side join but it shuffles the data
> > from 3 dataframes and then join using shuffled data. Apart from country
> > there are other columns in join.
> >
> > Is this correct behavior? If it is an issue is it fixed in latest
> versions?
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
>

Reply via email to