Hi Yin, Thanks for the detailed explanation. I have one more question for the correlation optimizer. When I ran explain in query 17 I get the plan for stage 1 where the bulk of the time goes. I can understand what is happening in the map phase but the reduce phase confuses me when the optimizer kicks in, mainly because I see many join operators and I can't find any join between the part and lineitem table.
I see that in the reduce phase there is a global aggregation on avg(l_quantity) and then I can see 3 join operators. Why are there 3 joins? Also what exactly are the demux and mux operators? Are they related to the correlation optimizer or not? The explain output is attached and the query is the following: select sum(l_extendedprice) / 7.0 as avg_yearly from (select l_quantity, l_extendedprice, t_avg_quantity from (select l_partkey as t_partkey, 0.2 * avg(l_quantity) as t_avg_quantity from lineitem group by l_partkey ) t join (select l_quantity, l_partkey, l_extendedprice from part p join lineitem l on p.p_partkey = l.l_partkey and p.p_brand = 'Brand#23' and p.p_container = 'MED BOX' ) l1 on l1.l_partkey = t.t_partkey ) a where l_quantity < t_avg_quantity; Thanks, Avrilia On Tue, Dec 10, 2013 at 7:38 PM, Yin Huai <huaiyin....@gmail.com> wrote: > Hi Avrilia, > > It is caused by distinct aggregations in TPC-H Q21. Because Hive adds > those distinct columns in the key columns of ReduceSinkOperators and > correlation optimizer only check exact same key columns right now, this > query will not be optimized. The jira of this issue is > https://issues.apache.org/jira/browse/HIVE-4751. If you remove distinct > from those aggregation functions, you will see the optimized plan. Also, > another kind of cases that the correlation optimizer does not optimize > right now is that a table is used in multiple MR jobs but rows in this > table are shuffled in different ways. > > Thanks, > > Yin > > > On Tue, Dec 10, 2013 at 8:05 PM, Avrilia Floratou < > avrilia.flora...@gmail.com> wrote: > >> Hi, >> >> I'm running TPCH query 21 on Hive. 0.12 and have enabled >> hive.optimize.correlation. >> I could see the effect of the correlation optimizer on query 17 but when >> running query 21 I don't actually see the optimizer being used. I used the >> publicly available tpc-h queries for hive and merged all the intermediate >> subqueries into one for Q21. In this query there is a correlation >> between multiple subqueries since they all get lineitem as input. But what >> I observe from the query plan and the execution of the query is that the >> subqueries are executed one by one and their results are materialized >> before the joins among them are executed. Is there any other parameter that >> I need to set to make this work? >> >> Thanks, >> Avrilia >> > >
ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME lineitem))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL l_partkey) t_partkey) (TOK_SELEXPR (* 0.2 (TOK_FUNCTION avg (TOK_TABLE_OR_COL l_quantity))) t_avg_quantity)) (TOK_GROUPBY (TOK_TABLE_OR_COL l_partkey)))) t) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME part) p) (TOK_TABREF (TOK_TABNAME lineitem) l) (and (and (= (. (TOK_TABLE_OR_COL p) p_partkey) (. (TOK_TABLE_OR_COL l) l_partkey)) (= (. (TOK_TABLE_OR_COL p) p_brand) 'Brand#23')) (= (. (TOK_TABLE_OR_COL p) p_container) 'MED BOX')))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL l_quantity)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_partkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_extendedprice))))) l1) (= (. (TOK_TABLE_OR_COL l1) l_partkey) (. (TOK_TABLE_OR_COL t) t_partkey)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL l_quantity)) (TOK_SELEXPR (TOK_TABLE_OR_COL l_extendedprice)) (TOK_SELEXPR (TOK_TABLE_OR_COL t_avg_quantity))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (/ (TOK_FUNCTION sum (TOK_TABLE_OR_COL l_extendedprice)) 7.0) avg_yearly)) (TOK_WHERE (< (TOK_TABLE_OR_COL l_quantity) (TOK_TABLE_OR_COL t_avg_quantity))))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a:l1:l TableScan alias: l Reduce Output Operator key expressions: expr: l_partkey type: int sort order: + Map-reduce partition columns: expr: l_partkey type: int tag: 1 value expressions: expr: l_partkey type: int expr: l_quantity type: double expr: l_extendedprice type: double a:l1:p TableScan alias: p Filter Operator predicate: expr: ((p_brand = 'Brand#23') and (p_container = 'MED BOX')) type: boolean Reduce Output Operator key expressions: expr: p_partkey type: int sort order: + Map-reduce partition columns: expr: p_partkey type: int tag: 2 a:t:lineitem TableScan alias: lineitem Select Operator expressions: expr: l_partkey type: int expr: l_quantity type: double outputColumnNames: l_partkey, l_quantity Group By Operator aggregations: expr: avg(l_quantity) bucketGroup: false keys: expr: l_partkey type: int mode: hash outputColumnNames: _col0, _col1 Reduce Output Operator key expressions: expr: _col0 type: int sort order: + Map-reduce partition columns: expr: _col0 type: int tag: 0 value expressions: expr: _col1 type: struct<count:bigint,sum:double> Reduce Operator Tree: Demux Operator Group By Operator aggregations: expr: avg(VALUE._col0) bucketGroup: false keys: expr: KEY._col0 type: int mode: mergepartial outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col0 type: int expr: (0.2 * _col1) type: double outputColumnNames: _col0, _col1 Mux Operator Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col1} 1 {VALUE._col0} {VALUE._col2} handleSkewJoin: false outputColumnNames: _col1, _col2, _col4 Filter Operator predicate: expr: (_col2 < _col1) type: boolean Select Operator expressions: expr: _col4 type: double outputColumnNames: _col1 Group By Operator aggregations: expr: sum(_col1) bucketGroup: false mode: hash outputColumnNames: _col0 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 1 {VALUE._col1} {VALUE._col4} {VALUE._col5} handleSkewJoin: false outputColumnNames: _col12, _col15, _col16 Select Operator expressions: expr: _col15 type: double expr: _col12 type: int expr: _col16 type: double outputColumnNames: _col0, _col1, _col2 Mux Operator Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col1} 1 {VALUE._col0} {VALUE._col2} handleSkewJoin: false outputColumnNames: _col1, _col2, _col4 Filter Operator predicate: expr: (_col2 < _col1) type: boolean Select Operator expressions: expr: _col4 type: double outputColumnNames: _col1 Group By Operator aggregations: expr: sum(_col1) bucketGroup: false mode: hash outputColumnNames: _col0 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-2 Map Reduce Alias -> Map Operator Tree: hdfs://lassen22.almaden.ibm.com:9100/tmp/hive-aflorat/hive_2013-12-10_19-12-01_556_5005234550872754317-1/-mr-10002 Reduce Output Operator sort order: tag: -1 value expressions: expr: _col0 type: double Reduce Operator Tree: Group By Operator aggregations: expr: sum(VALUE._col0) bucketGroup: false mode: mergepartial outputColumnNames: _col0 Select Operator expressions: expr: (_col0 / 7.0) type: double outputColumnNames: _col0 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1