Hi,
I have a table which is directly from S3 location and even a self join on
that cached table is causing the data to be read from S3 again.
The query plan in mentioned below:
== Parsed Logical Plan ==
Aggregate [count(1) AS count#1804L]
Project [user#0,programme_key#515]
Join Inner, Some((programme_key#515 = programme_key#1802))
Subquery left_table
Subquery omniture
Project [scv_id#4 AS user#0,programme_key#515]
Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as
string))) && (is_logged_in#8L = cast(1 as bigint))) &&
(is_4od_video_view#40L = cast(1 as bigint)))
MetastoreRelation default, omnitureweb_log, None
Subquery right_table
Subquery omniture
Project [scv_id#1291 AS user#771,programme_key#1802]
Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN
(cast(20 as string))) && (is_logged_in#1295L = cast(1 as bigint))) &&
(is_4od_video_view#1327L = cast(1 as bigint)))
MetastoreRelation default, omnitureweb_log, None
== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#1804L]
Project [user#0,programme_key#515]
Join Inner, Some((programme_key#515 = programme_key#1802))
Subquery left_table
Subquery omniture
Project [scv_id#4 AS user#0,programme_key#515]
Filter (((hit_month#2 IN (2015-11) && hit_day#3 IN (cast(20 as
string))) && (is_logged_in#8L = cast(1 as bigint))) &&
(is_4od_video_view#40L = cast(1 as bigint)))
MetastoreRelation default, omnitureweb_log, None
Subquery right_table
Subquery omniture
Project [scv_id#1291 AS user#771,programme_key#1802]
Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN
(cast(20 as string))) && (is_logged_in#1295L = cast(1 as bigint))) &&
(is_4od_video_view#1327L = cast(1 as bigint)))
MetastoreRelation default, omnitureweb_log, None
== Optimized Logical Plan ==
Aggregate [count(1) AS count#1804L]
Project
Join Inner, Some((programme_key#515 = programme_key#1802))
Project [programme_key#515]
InMemoryRelation [user#0,programme_key#515], true, 10000,
StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS
user#0,programme_key#515]), None
Project [programme_key#1802]
Filter (((hit_month#1289 IN (2015-11) && hit_day#1290 IN (20)) &&
(is_logged_in#1295L = 1)) && (is_4od_video_view#1327L = 1))
MetastoreRelation default, omnitureweb_log, None
== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[count#1804L])
TungstenExchange SinglePartition
TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[currentCount#1817L])
TungstenProject
SortMergeJoin [programme_key#515], [programme_key#1802]
TungstenSort [programme_key#515 ASC], false, 0
TungstenExchange hashpartitioning(programme_key#515)
ConvertToUnsafe
InMemoryColumnarTableScan [programme_key#515],
(InMemoryRelation [user#0,programme_key#515], true, 10000,
StorageLevel(true, true, false, true, 1), (Project [scv_id#4 AS
user#0,programme_key#515]), None)
TungstenSort [programme_key#1802 ASC], false, 0
TungstenExchange hashpartitioning(programme_key#1802)
ConvertToUnsafe
Project [programme_key#1802]
Filter ((is_logged_in#1295L = 1) && (is_4od_video_view#1327L = 1))
HiveTableScan
[programme_key#1802,is_logged_in#1295L,is_4od_video_view#1327L],
(MetastoreRelation default, omnitureweb_log, None), [hit_month#1289 IN
(2015-11),hit_day#1290 IN (20)]
Code Generation: true
Regards,
Gourav
On Fri, Dec 18, 2015 at 8:55 AM, Gourav Sengupta <[email protected]>
wrote:
> hi,
>
> I think that people have reported the same issue elsewhere, and this
> should be registered as a bug in SPARK
>
> https://forums.databricks.com/questions/2142/self-join-in-spark-sql.html
>
>
> Regards,
> Gourav
>
> On Thu, Dec 17, 2015 at 10:52 AM, Gourav Sengupta <
> [email protected]> wrote:
>
>> Hi Ted,
>>
>> The self join works fine on tbales where the hivecontext tables are
>> direct hive tables, therefore
>>
>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>> table1.registerTempTable("table1")
>> table1.cache()
>> table1.count()
>>
>> and if I do a self join on table1 things are quite fine
>>
>> But in case we have something like this:
>> table1 = hiveContext.sql("select columnA, columnB from hivetable1")
>> table1.registerTempTable("table1")
>> table1.cache()
>> table1.count()
>>
>> table2 = hiveContext.sql("select columnA, columnB from hivetable2")
>> table2.registerTempTable("table2")
>> table2.cache()
>> table2.count()
>>
>> table3 = hiveContext.sql("select table1.* from table1 table2 where
>> table1.columnA = table2.columnA")
>> table3.registerTempTable("table3")
>> table3.cache()
>> table3.count()
>>
>>
>> then the self join on table3 does not take data from table3 cache,
>> neither from table1 or table2 cache but starts taking data directly from S3
>> - which as you would understand does not make any sense.
>>
>>
>> Regards,
>> Gourav
>>
>>
>>
>>
>>
>> On Wed, Dec 16, 2015 at 7:16 PM, Ted Yu <[email protected]> wrote:
>>
>>> I did the following exercise in spark-shell ("c" is cached table):
>>>
>>> scala> sqlContext.sql("select x.b from c x join c y on x.a =
>>> y.a").explain
>>> == Physical Plan ==
>>> Project [b#4]
>>> +- BroadcastHashJoin [a#3], [a#125], BuildRight
>>> :- InMemoryColumnarTableScan [b#4,a#3], InMemoryRelation
>>> [a#3,b#4,c#5], true, 10000, StorageLevel(true, true, false, true, 1),
>>> ConvertToUnsafe, Some(c)
>>> +- InMemoryColumnarTableScan [a#125], InMemoryRelation
>>> [a#125,b#126,c#127], true, 10000, StorageLevel(true, true, false, true, 1),
>>> ConvertToUnsafe, Some(c)
>>>
>>> sqlContext.sql("select x.b, y.c from c x join c y on x.a =
>>> y.a").registerTempTable("d")
>>> scala> sqlContext.cacheTable("d")
>>>
>>> scala> sqlContext.sql("select x.b from d x join d y on x.c =
>>> y.c").explain
>>> == Physical Plan ==
>>> Project [b#4]
>>> +- SortMergeJoin [c#90], [c#253]
>>> :- Sort [c#90 ASC], false, 0
>>> : +- TungstenExchange hashpartitioning(c#90,200), None
>>> : +- InMemoryColumnarTableScan [b#4,c#90], InMemoryRelation
>>> [b#4,c#90], true, 10000, StorageLevel(true, true, false, true, 1), Project
>>> [b#4,c#90], Some(d)
>>> +- Sort [c#253 ASC], false, 0
>>> +- TungstenExchange hashpartitioning(c#253,200), None
>>> +- InMemoryColumnarTableScan [c#253], InMemoryRelation
>>> [b#246,c#253], true, 10000, StorageLevel(true, true, false, true, 1),
>>> Project [b#4,c#90], Some(d)
>>>
>>> Is the above what you observed ?
>>>
>>> Cheers
>>>
>>> On Wed, Dec 16, 2015 at 9:34 AM, Gourav Sengupta <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> This is how the data can be created:
>>>>
>>>> 1. TableA : cached()
>>>> 2. TableB : cached()
>>>> 3. TableC: TableA inner join TableB cached()
>>>> 4. TableC join TableC does not take the data from cache but starts
>>>> reading the data for TableA and TableB from disk.
>>>>
>>>> Does this sound like a bug? The self join between TableA and TableB are
>>>> working fine and taking data from cache.
>>>>
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>
>>>
>>
>