[ https://issues.apache.org/jira/browse/SPARK-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tao Li updated SPARK-16327: --------------------------- Description: I found that the two subquery in join operator can't execute in parallel. For example, t1 and t2 can't execute in parallel. {code:SQL|title=test.sql|borderStyle=solid} select t1.channel_l1 as channel, t1.pv as pv1, t2.pv as pv2 from (select channel_l1, count(*) as pv from custom.common_wap_pv where logdate>='2016060605' and logdate<='2016060605' group by channel_l1) t1 join (select channel_l1, count(*) as pv from custom.common_wap_pv where logdate>='2016060606' and logdate<='2016060606' group by channel_l1) t2 on t1.channel_l1 = t2.channel_l1 {code} Physical Plan: {code} == Physical Plan == Limit 21 +- ConvertToSafe +- Project [channel_l1#58 AS channel#2,pv#0L AS pv1#3L,pv#1L AS pv2#4L] +- SortMergeJoin [channel_l1#58], [channel_l1#132] :- Sort [channel_l1#58 ASC], false, 0 : +- TungstenExchange(coordinator id: 182426297) hashpartitioning(channel_l1#58,200), Some(coordinator[target post-shuffle partition size: 67108864]) : +- TungstenAggregate(key=[channel_l1#58], functions=[(count(1),mode=Final,isDistinct=false)], output=[channel_l1#58,pv#0L]) : +- TungstenExchange(coordinator id: 1254877490) hashpartitioning(channel_l1#58,200), Some(coordinator[target post-shuffle partition size: 67108864]) : +- TungstenAggregate(key=[channel_l1#58], functions=[(count(1),mode=Partial,isDistinct=false)], output=[channel_l1#58,count#155L]) : +- HiveTableScan [channel_l1#58], MetastoreRelation custom, common_wap_pv, None, [(logdate#5 >= 2016060605),(logdate#5 <= 2016060605)] +- Sort [channel_l1#132 ASC], false, 0 +- TungstenExchange(coordinator id: 1199219058) hashpartitioning(channel_l1#132,200), Some(coordinator[target post-shuffle partition size: 67108864]) +- TungstenAggregate(key=[channel_l1#132], functions=[(count(1),mode=Final,isDistinct=false)], output=[channel_l1#132,pv#1L]) +- TungstenExchange(coordinator id: 1391280854) hashpartitioning(channel_l1#132,200), Some(coordinator[target post-shuffle partition size: 67108864]) +- TungstenAggregate(key=[channel_l1#132], functions=[(count(1),mode=Partial,isDistinct=false)], output=[channel_l1#132,count#158L]) +- HiveTableScan [channel_l1#132], MetastoreRelation custom, common_wap_pv, None, [(logdate#79 >= 2016060606),(logdate#79 <= 2016060606)] {code} The two subquery is not executed in parallel: !http://7xid4y.com1.z0.glb.clouddn.com/demo.png! was: I found that the two subquery in join operator can't execute in parallel. For example, t1 and t2 can't execute in parallel. {code:SQL|title=test.sql|borderStyle=solid} select t1.channel_l1 as channel, t1.pv as pv1, t2.pv as pv2 from (select channel_l1, count(*) as pv from custom.common_wap_pv where logdate>='2016060605' and logdate<='2016060605' group by channel_l1) t1 join (select channel_l1, count(*) as pv from custom.common_wap_pv where logdate>='2016060606' and logdate<='2016060606' group by channel_l1) t2 on t1.channel_l1 = t2.channel_l1 {code} Physical Plan: {code} == Physical Plan == Limit 21 +- ConvertToSafe +- Project [channel_l1#1041 AS channel#985,pv#983L AS pv1#986L,pv#984L AS pv2#987L] +- CartesianProduct :- ConvertToSafe : +- TungstenAggregate(key=[channel_l1#1041], functions=[(count(1),mode=Final,isDistinct=false)], output=[channel_l1#1041,pv#983L]) : +- TungstenExchange(coordinator id: 1741186649) hashpartitioning(channel_l1#1041,200), Some(coordinator[target post-shuffle partition size: 67108864]) : +- TungstenAggregate(key=[channel_l1#1041], functions=[(count(1),mode=Partial,isDistinct=false)], output=[channel_l1#1041,count#1138L]) : +- HiveTableScan [channel_l1#1041], MetastoreRelation custom, common_wap_pv, None, [(logdate#988 >= 2016060605),(logdate#988 <= 2016060605)] +- ConvertToSafe +- Project [pv#984L] +- TungstenAggregate(key=[channel_l1#1115], functions=[(count(1),mode=Final,isDistinct=false)], output=[pv#984L]) +- TungstenExchange(coordinator id: 842269542) hashpartitioning(channel_l1#1115,200), Some(coordinator[target post-shuffle partition size: 67108864]) +- TungstenAggregate(key=[channel_l1#1115], functions=[(count(1),mode=Partial,isDistinct=false)], output=[channel_l1#1115,count#1141L]) +- HiveTableScan [channel_l1#1115], MetastoreRelation custom, common_wap_pv, None, [(logdate#1062 >= 2016060606),(logdate#1062 <= 2016060606)] {code} The two subquery is not executed in parallel: !http://7xid4y.com1.z0.glb.clouddn.com/demo.png! > The two subquery in join operator can't be execute in parallel > -------------------------------------------------------------- > > Key: SPARK-16327 > URL: https://issues.apache.org/jira/browse/SPARK-16327 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 1.6.0, 1.6.1, 1.6.2 > Environment: Spark: 1.6.2 > Hadoop: 2.5.0-cdh5.3.2 > Hive: 0.14.0 > Reporter: Tao Li > > I found that the two subquery in join operator can't execute in parallel. For > example, t1 and t2 can't execute in parallel. > {code:SQL|title=test.sql|borderStyle=solid} > select t1.channel_l1 as channel, t1.pv as pv1, t2.pv as pv2 > from (select channel_l1, count(*) as pv from custom.common_wap_pv where > logdate>='2016060605' and logdate<='2016060605' group by channel_l1) t1 > join (select channel_l1, count(*) as pv from custom.common_wap_pv where > logdate>='2016060606' and logdate<='2016060606' group by channel_l1) t2 > on t1.channel_l1 = t2.channel_l1 > {code} > Physical Plan: > {code} > == Physical Plan == > Limit 21 > +- ConvertToSafe > +- Project [channel_l1#58 AS channel#2,pv#0L AS pv1#3L,pv#1L AS pv2#4L] > +- SortMergeJoin [channel_l1#58], [channel_l1#132] > :- Sort [channel_l1#58 ASC], false, 0 > : +- TungstenExchange(coordinator id: 182426297) > hashpartitioning(channel_l1#58,200), Some(coordinator[target post-shuffle > partition size: 67108864]) > : +- TungstenAggregate(key=[channel_l1#58], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[channel_l1#58,pv#0L]) > : +- TungstenExchange(coordinator id: 1254877490) > hashpartitioning(channel_l1#58,200), Some(coordinator[target post-shuffle > partition size: 67108864]) > : +- TungstenAggregate(key=[channel_l1#58], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[channel_l1#58,count#155L]) > : +- HiveTableScan [channel_l1#58], MetastoreRelation > custom, common_wap_pv, None, [(logdate#5 >= 2016060605),(logdate#5 <= > 2016060605)] > +- Sort [channel_l1#132 ASC], false, 0 > +- TungstenExchange(coordinator id: 1199219058) > hashpartitioning(channel_l1#132,200), Some(coordinator[target post-shuffle > partition size: 67108864]) > +- TungstenAggregate(key=[channel_l1#132], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[channel_l1#132,pv#1L]) > +- TungstenExchange(coordinator id: 1391280854) > hashpartitioning(channel_l1#132,200), Some(coordinator[target post-shuffle > partition size: 67108864]) > +- TungstenAggregate(key=[channel_l1#132], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[channel_l1#132,count#158L]) > +- HiveTableScan [channel_l1#132], MetastoreRelation > custom, common_wap_pv, None, [(logdate#79 >= 2016060606),(logdate#79 <= > 2016060606)] > {code} > The two subquery is not executed in parallel: > !http://7xid4y.com1.z0.glb.clouddn.com/demo.png! -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org