Hi Yin,

Thanks for the detailed explanation. I have one more question for the
correlation optimizer. When I ran explain in query 17 I get the plan for
stage 1 where the bulk of the time goes. I can understand what is happening
in the map phase but the reduce phase confuses me when the optimizer kicks
in, mainly because I see many join operators and I can't find any join
between the part and lineitem table.

I see that in the reduce phase there is a global aggregation on
avg(l_quantity) and then I can see 3 join operators. Why are there 3 joins?
Also what exactly are the demux and mux operators? Are they related to the
correlation optimizer or not?

The explain output is attached and the query is the following:

select
  sum(l_extendedprice) / 7.0 as avg_yearly
from
  (select l_quantity, l_extendedprice, t_avg_quantity from
   (select
       l_partkey as t_partkey, 0.2 * avg(l_quantity) as t_avg_quantity
     from lineitem
     group by l_partkey
     ) t join
     (select
        l_quantity, l_partkey, l_extendedprice
      from
        part p join lineitem l
        on
          p.p_partkey = l.l_partkey
          and p.p_brand = 'Brand#23'
          and p.p_container = 'MED BOX'
      ) l1 on l1.l_partkey = t.t_partkey
   ) a
where l_quantity < t_avg_quantity;


Thanks,
Avrilia



On Tue, Dec 10, 2013 at 7:38 PM, Yin Huai <huaiyin....@gmail.com> wrote:

> Hi Avrilia,
>
> It is caused by distinct aggregations in TPC-H Q21. Because Hive adds
> those distinct columns in the key columns of ReduceSinkOperators and
> correlation optimizer only check exact same key columns right now, this
> query will not be optimized. The jira of this issue is
> https://issues.apache.org/jira/browse/HIVE-4751. If you remove distinct
> from those aggregation functions, you will see the optimized plan. Also,
> another kind of cases that the correlation optimizer does not optimize
> right now is that a table is used in multiple MR jobs but rows in this
> table are shuffled in different ways.
>
> Thanks,
>
> Yin
>
>
> On Tue, Dec 10, 2013 at 8:05 PM, Avrilia Floratou <
> avrilia.flora...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm running TPCH query 21 on Hive. 0.12 and have enabled 
>> hive.optimize.correlation.
>> I could see the effect of the correlation optimizer on query 17 but when
>> running query 21 I don't actually see the optimizer being used. I used the
>> publicly available tpc-h queries for hive and merged all the intermediate
>> subqueries into one for Q21. In this query there is a correlation
>> between multiple subqueries since they all get lineitem as input. But what
>> I observe from the query plan and the execution of the query is that the
>> subqueries are executed one by one and their results are materialized
>> before the joins among them are executed. Is there any other parameter that
>> I need to set to make this work?
>>
>> Thanks,
>> Avrilia
>>
>
>
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN 
(TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME lineitem))) 
(TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR 
(TOK_TABLE_OR_COL l_partkey) t_partkey) (TOK_SELEXPR (* 0.2 (TOK_FUNCTION avg 
(TOK_TABLE_OR_COL l_quantity))) t_avg_quantity)) (TOK_GROUPBY (TOK_TABLE_OR_COL 
l_partkey)))) t) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF 
(TOK_TABNAME part) p) (TOK_TABREF (TOK_TABNAME lineitem) l) (and (and (= (. 
(TOK_TABLE_OR_COL p) p_partkey) (. (TOK_TABLE_OR_COL l) l_partkey)) (= (. 
(TOK_TABLE_OR_COL p) p_brand) 'Brand#23')) (= (. (TOK_TABLE_OR_COL p) 
p_container) 'MED BOX')))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) 
(TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL l_quantity)) (TOK_SELEXPR 
(TOK_TABLE_OR_COL l_partkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL 
l_extendedprice))))) l1) (= (. (TOK_TABLE_OR_COL l1) l_partkey) (. 
(TOK_TABLE_OR_COL t) t_partkey)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR 
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL l_quantity)) 
(TOK_SELEXPR (TOK_TABLE_OR_COL l_extendedprice)) (TOK_SELEXPR (TOK_TABLE_OR_COL 
t_avg_quantity))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) 
(TOK_SELECT (TOK_SELEXPR (/ (TOK_FUNCTION sum (TOK_TABLE_OR_COL 
l_extendedprice)) 7.0) avg_yearly)) (TOK_WHERE (< (TOK_TABLE_OR_COL l_quantity) 
(TOK_TABLE_OR_COL t_avg_quantity)))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        a:l1:l 
          TableScan
            alias: l
            Reduce Output Operator
              key expressions:
                    expr: l_partkey
                    type: int
              sort order: +
              Map-reduce partition columns:
                    expr: l_partkey
                    type: int
              tag: 1
              value expressions:
                    expr: l_partkey
                    type: int
                    expr: l_quantity
                    type: double
                    expr: l_extendedprice
                    type: double
        a:l1:p 
          TableScan
            alias: p
            Filter Operator
              predicate:
                  expr: ((p_brand = 'Brand#23') and (p_container = 'MED BOX'))
                  type: boolean
              Reduce Output Operator
                key expressions:
                      expr: p_partkey
                      type: int
                sort order: +
                Map-reduce partition columns:
                      expr: p_partkey
                      type: int
                tag: 2
        a:t:lineitem 
          TableScan
            alias: lineitem
            Select Operator
              expressions:
                    expr: l_partkey
                    type: int
                    expr: l_quantity
                    type: double
              outputColumnNames: l_partkey, l_quantity
              Group By Operator
                aggregations:
                      expr: avg(l_quantity)
                bucketGroup: false
                keys:
                      expr: l_partkey
                      type: int
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: int
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: int
                  tag: 0
                  value expressions:
                        expr: _col1
                        type: struct<count:bigint,sum:double>
      Reduce Operator Tree:
        Demux Operator
          Group By Operator
            aggregations:
                  expr: avg(VALUE._col0)
            bucketGroup: false
            keys:
                  expr: KEY._col0
                  type: int
            mode: mergepartial
            outputColumnNames: _col0, _col1
            Select Operator
              expressions:
                    expr: _col0
                    type: int
                    expr: (0.2 * _col1)
                    type: double
              outputColumnNames: _col0, _col1
              Mux Operator
                Join Operator
                  condition map:
                       Inner Join 0 to 1
                  condition expressions:
                    0 {VALUE._col1}
                    1 {VALUE._col0} {VALUE._col2}
                  handleSkewJoin: false
                  outputColumnNames: _col1, _col2, _col4
                  Filter Operator
                    predicate:
                        expr: (_col2 < _col1)
                        type: boolean
                    Select Operator
                      expressions:
                            expr: _col4
                            type: double
                      outputColumnNames: _col1
                      Group By Operator
                        aggregations:
                              expr: sum(_col1)
                        bucketGroup: false
                        mode: hash
                        outputColumnNames: _col0
                        File Output Operator
                          compressed: false
                          GlobalTableId: 0
                          table:
                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
          Join Operator
            condition map:
                 Inner Join 0 to 1
            condition expressions:
              0 
              1 {VALUE._col1} {VALUE._col4} {VALUE._col5}
            handleSkewJoin: false
            outputColumnNames: _col12, _col15, _col16
            Select Operator
              expressions:
                    expr: _col15
                    type: double
                    expr: _col12
                    type: int
                    expr: _col16
                    type: double
              outputColumnNames: _col0, _col1, _col2
              Mux Operator
                Join Operator
                  condition map:
                       Inner Join 0 to 1
                  condition expressions:
                    0 {VALUE._col1}
                    1 {VALUE._col0} {VALUE._col2}
                  handleSkewJoin: false
                  outputColumnNames: _col1, _col2, _col4
                  Filter Operator
                    predicate:
                        expr: (_col2 < _col1)
                        type: boolean
                    Select Operator
                      expressions:
                            expr: _col4
                            type: double
                      outputColumnNames: _col1
                      Group By Operator
                        aggregations:
                              expr: sum(_col1)
                        bucketGroup: false
                        mode: hash
                        outputColumnNames: _col0
                        File Output Operator
                          compressed: false
                          GlobalTableId: 0
                          table:
                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        
hdfs://lassen22.almaden.ibm.com:9100/tmp/hive-aflorat/hive_2013-12-10_19-12-01_556_5005234550872754317-1/-mr-10002
 
            Reduce Output Operator
              sort order: 
              tag: -1
              value expressions:
                    expr: _col0
                    type: double
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: sum(VALUE._col0)
          bucketGroup: false
          mode: mergepartial
          outputColumnNames: _col0
          Select Operator
            expressions:
                  expr: (_col0 / 7.0)
                  type: double
            outputColumnNames: _col0
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1


Reply via email to