Hi, I have a table which is directly from S3 location and even a self join on that cached table is causing the data to be read from S3 again.
The query plan in mentioned below: == Parsed Logical Plan == Aggregate [count(1) AS count#1804L] Project [user#0,programme_key#515] Join Inner, Some((programme_key#515 = programme_key#1802)) Subquery left_table Subquery omniture Project [scv_id#4 AS user#0,programme_key#515] Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as string))) && (is_logged_in#8L = cast(1 as bigint))) && (is_4od_video_view#40L = cast(1 as bigint))) MetastoreRelation default, omnitureweb_log, None Subquery right_table Subquery omniture Project [scv_id#1291 AS user#771,programme_key#1802] Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as string))) && (is_logged_in#1295L = cast(1 as bigint))) && (is_4od_video_view#1327L = cast(1 as bigint))) MetastoreRelation default, omnitureweb_log, None == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#1804L] Project [user#0,programme_key#515] Join Inner, Some((programme_key#515 = programme_key#1802)) Subquery left_table Subquery omniture Project [scv_id#4 AS user#0,programme_key#515] Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as string))) && (is_logged_in#8L = cast(1 as bigint))) && (is_4od_video_view#40L = cast(1 as bigint))) MetastoreRelation default, omnitureweb_log, None Subquery right_table Subquery omniture Project [scv_id#1291 AS user#771,programme_key#1802] Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (cast(20 as string))) && (is_logged_in#1295L = cast(1 as bigint))) && (is_4od_video_view#1327L = cast(1 as bigint))) MetastoreRelation default, omnitureweb_log, None == Optimized Logical Plan == Aggregate [count(1) AS count#1804L] Project Join Inner, Some((programme_key#515 = programme_key#1802)) Project [programme_key#515] InMemoryRelation [user#0,programme_key#515], true, 10000, StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS user#0,programme_key#515]), None Project [programme_key#1802] Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (20)) && (is_logged_in#1295L = 1)) && (is_4od_video_view#1327L = 1)) MetastoreRelation default, omnitureweb_log, None == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#1804L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#1817L]) TungstenProject SortMergeJoin [programme_key#515], [programme_key#1802] TungstenSort [programme_key#515 ASC], false, 0 TungstenExchange hashpartitioning(programme_key#515) ConvertToUnsafe InMemoryColumnarTableScan [programme_key#515], (InMemoryRelation [user#0,programme_key#515], true, 10000, StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS user#0,programme_key#515]), None) TungstenSort [programme_key#1802 ASC], false, 0 TungstenExchange hashpartitioning(programme_key#1802) ConvertToUnsafe Project [programme_key#1802] Filter ((is_logged_in#1295L = 1) && (is_4od_video_view#1327L = 1)) HiveTableScan [programme_key#1802,is_logged_in#1295L,is_4od_video_view#1327L], (MetastoreRelation default, omnitureweb_log, None), [hit_month#1289 IN (2015-11),hit_day#1290 IN (20)] Code Generation: true Regards, Gourav On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > hi, > > I think that people have reported the same issue elsewhere, and this > should be registered as a bug in SPARK > > https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html > > > Regards, > Gourav > > On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi Ted, >> >> The self join works fine on tbales where the hivecontext tables are >> direct hive tables, therefore >> >> table1 = hiveContext.sql("select columnA, columnB from hivetable1") >> table1.registerTempTable("table1") >> table1.cache() >> table1.count() >> >> and if I do a self join on table1 things are quite fine >> >> But in case we have something like this: >> table1 = hiveContext.sql("select columnA, columnB from hivetable1") >> table1.registerTempTable("table1") >> table1.cache() >> table1.count() >> >> table2 = hiveContext.sql("select columnA, columnB from hivetable2") >> table2.registerTempTable("table2") >> table2.cache() >> table2.count() >> >> table3 = hiveContext.sql("select table1.* from table1 table2 where >> table1.columnA = table2.columnA") >> table3.registerTempTable("table3") >> table3.cache() >> table3.count() >> >> >> then the self join on table3 does not take data from table3 cache, >> neither from table1 or table2 cache but starts taking data directly from S3 >> - which as you would understand does not make any sense. >> >> >> Regards, >> Gourav >> >> >> >> >> >> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> I did the following exercise in spark-shell ("c" is cached table): >>> >>> scala> sqlContext.sql("select x.b from c x join c y on x.a = >>> y.a").explain >>> == Physical Plan == >>> Project [b#4] >>> +- BroadcastHashJoin [a#3], [a#125], BuildRight >>> :- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation >>> [a#3,b#4,c#5], true, 10000, StorageLevel(true, true, false, true, 1), >>> ConvertToUnsafe, Some(c) >>> +- InMemoryColumnarTableScan [a#125], InMemoryRelation >>> [a#125,b#126,c#127], true, 10000, StorageLevel(true, true, false, true, 1), >>> ConvertToUnsafe, Some(c) >>> >>> sqlContext.sql("select x.b, y.c from c x join c y on x.a = >>> y.a").registerTempTable("d") >>> scala> sqlContext.cacheTable("d") >>> >>> scala> sqlContext.sql("select x.b from d x join d y on x.c = >>> y.c").explain >>> == Physical Plan == >>> Project [b#4] >>> +- SortMergeJoin [c#90], [c#253] >>> :- Sort [c#90 ASC], false, 0 >>> : +- TungstenExchange hashpartitioning(c#90,200), None >>> : +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation >>> [b#4,c#90], true, 10000, StorageLevel(true, true, false, true, 1), Project >>> [b#4,c#90], Some(d) >>> +- Sort [c#253 ASC], false, 0 >>> +- TungstenExchange hashpartitioning(c#253,200), None >>> +- InMemoryColumnarTableScan [c#253], InMemoryRelation >>> [b#246,c#253], true, 10000, StorageLevel(true, true, false, true, 1), >>> Project [b#4,c#90], Some(d) >>> >>> Is the above what you observed ? >>> >>> Cheers >>> >>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta < >>> gourav.sengu...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> This is how the data can be created: >>>> >>>> 1. TableA : cached() >>>> 2. TableB : cached() >>>> 3. TableC: TableA inner join TableB cached() >>>> 4. TableC join TableC does not take the data from cache but starts >>>> reading the data for TableA and TableB from disk. >>>> >>>> Does this sound like a bug? The self join between TableA and TableB are >>>> working fine and taking data from cache. >>>> >>>> >>>> Regards, >>>> Gourav >>>> >>> >>> >> >