Hi,

I have a table which is directly from S3 location and even a self join on
that cached table is causing the data to be read from S3 again.

The query plan in mentioned below:

== Parsed Logical Plan ==
Aggregate [count(1) AS count#1804L]
 Project [user#0,programme_key#515]
  Join Inner, Some((programme_key#515 = programme_key#1802))
   Subquery left_table
    Subquery omniture
     Project [scv_id#4 AS user#0,programme_key#515]
      Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as
string))) && (is_logged_in#8L = cast(1 as bigint))) &&
(is_4od_video_view#40L = cast(1 as bigint)))
       MetastoreRelation default, omnitureweb_log, None
   Subquery right_table
    Subquery omniture
     Project [scv_id#1291 AS user#771,programme_key#1802]
      Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN
(cast(20 as string))) && (is_logged_in#1295L = cast(1 as bigint))) &&
(is_4od_video_view#1327L = cast(1 as bigint)))
       MetastoreRelation default, omnitureweb_log, None

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#1804L]
 Project [user#0,programme_key#515]
  Join Inner, Some((programme_key#515 = programme_key#1802))
   Subquery left_table
    Subquery omniture
     Project [scv_id#4 AS user#0,programme_key#515]
      Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as
string))) && (is_logged_in#8L = cast(1 as bigint))) &&
(is_4od_video_view#40L = cast(1 as bigint)))
       MetastoreRelation default, omnitureweb_log, None
   Subquery right_table
    Subquery omniture
     Project [scv_id#1291 AS user#771,programme_key#1802]
      Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN
(cast(20 as string))) && (is_logged_in#1295L = cast(1 as bigint))) &&
(is_4od_video_view#1327L = cast(1 as bigint)))
       MetastoreRelation default, omnitureweb_log, None

== Optimized Logical Plan ==
Aggregate [count(1) AS count#1804L]
 Project
  Join Inner, Some((programme_key#515 = programme_key#1802))
   Project [programme_key#515]
    InMemoryRelation [user#0,programme_key#515], true, 10000,
StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS
user#0,programme_key#515]), None
   Project [programme_key#1802]
    Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (20)) &&
(is_logged_in#1295L = 1)) && (is_4od_video_view#1327L = 1))
     MetastoreRelation default, omnitureweb_log, None

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[count#1804L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[currentCount#1817L])
   TungstenProject
    SortMergeJoin [programme_key#515], [programme_key#1802]
     TungstenSort [programme_key#515 ASC], false, 0
      TungstenExchange hashpartitioning(programme_key#515)
       ConvertToUnsafe
        InMemoryColumnarTableScan [programme_key#515],
(InMemoryRelation [user#0,programme_key#515], true, 10000,
StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS
user#0,programme_key#515]), None)
     TungstenSort [programme_key#1802 ASC], false, 0
      TungstenExchange hashpartitioning(programme_key#1802)
       ConvertToUnsafe
        Project [programme_key#1802]
         Filter ((is_logged_in#1295L = 1) && (is_4od_video_view#1327L = 1))
          HiveTableScan
[programme_key#1802,is_logged_in#1295L,is_4od_video_view#1327L],
(MetastoreRelation default, omnitureweb_log, None), [hit_month#1289 IN
(2015-11),hit_day#1290 IN (20)]

Code Generation: true



Regards,
Gourav

On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> hi,
>
> I think that people have reported the same issue elsewhere, and this
> should be registered as a bug in SPARK
>
> https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html
>
>
> Regards,
> Gourav
>
> On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Ted,
>>
>> The self join works fine on tbales where the hivecontext tables are
>> direct hive tables, therefore
>>
>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>> table1.registerTempTable("table1")
>> table1.cache()
>> table1.count()
>>
>> and if I do a self join on table1 things are quite fine
>>
>> But in case we have something like this:
>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>> table1.registerTempTable("table1")
>> table1.cache()
>> table1.count()
>>
>> table2 = hiveContext.sql("select columnA, columnB from hivetable2")
>> table2.registerTempTable("table2")
>> table2.cache()
>> table2.count()
>>
>> table3 = hiveContext.sql("select table1.* from table1 table2 where
>> table1.columnA = table2.columnA")
>> table3.registerTempTable("table3")
>> table3.cache()
>> table3.count()
>>
>>
>> then the self join on table3 does not take data from table3 cache,
>> neither from table1 or table2 cache but starts taking data directly from S3
>> - which as you would understand does not make any sense.
>>
>>
>> Regards,
>> Gourav
>>
>>
>>
>>
>>
>> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> I did the following exercise in spark-shell ("c" is cached table):
>>>
>>> scala> sqlContext.sql("select x.b from c x join c y on x.a =
>>> y.a").explain
>>> == Physical Plan ==
>>> Project [b#4]
>>> +- BroadcastHashJoin [a#3], [a#125], BuildRight
>>>    :- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation
>>> [a#3,b#4,c#5], true, 10000, StorageLevel(true, true, false, true, 1),
>>> ConvertToUnsafe, Some(c)
>>>    +- InMemoryColumnarTableScan [a#125], InMemoryRelation
>>> [a#125,b#126,c#127], true, 10000, StorageLevel(true, true, false, true, 1),
>>> ConvertToUnsafe, Some(c)
>>>
>>> sqlContext.sql("select x.b, y.c from c x join c y on x.a =
>>> y.a").registerTempTable("d")
>>> scala> sqlContext.cacheTable("d")
>>>
>>> scala> sqlContext.sql("select x.b from d x join d y on x.c =
>>> y.c").explain
>>> == Physical Plan ==
>>> Project [b#4]
>>> +- SortMergeJoin [c#90], [c#253]
>>>    :- Sort [c#90 ASC], false, 0
>>>    :  +- TungstenExchange hashpartitioning(c#90,200), None
>>>    :     +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation
>>> [b#4,c#90], true, 10000, StorageLevel(true, true, false, true, 1), Project
>>> [b#4,c#90], Some(d)
>>>    +- Sort [c#253 ASC], false, 0
>>>       +- TungstenExchange hashpartitioning(c#253,200), None
>>>          +- InMemoryColumnarTableScan [c#253], InMemoryRelation
>>> [b#246,c#253], true, 10000, StorageLevel(true, true, false, true, 1),
>>> Project [b#4,c#90], Some(d)
>>>
>>> Is the above what you observed ?
>>>
>>> Cheers
>>>
>>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> This is how the data  can be created:
>>>>
>>>> 1. TableA : cached()
>>>> 2. TableB : cached()
>>>> 3. TableC: TableA inner join TableB cached()
>>>> 4. TableC join TableC does not take the data from cache but starts
>>>> reading the data for TableA and TableB from disk.
>>>>
>>>> Does this sound like a bug? The self join between TableA and TableB are
>>>> working fine and taking data from cache.
>>>>
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>
>>>
>>
>

Reply via email to