[ 
https://issues.apache.org/jira/browse/HIVE-15474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15765176#comment-15765176
 ] 

Jesus Camacho Rodriguez commented on HIVE-15474:
------------------------------------------------

[~xuefuz], thanks for the feedback. It is indeed true for MR and Tez. I am not 
so familiar with Spark as the backend. If it does not return ordered groups, 
are you sure the sequence of operators is still RS-GB-RS? That would mean that 
other optimizations, such as ReduceSinkDeDuplication, do not work properly 
(\?). In turn, I would argue that RS is a physical operator that has certain 
semantics that should be respected, no matter the backend. However, if the 
sequence of operators is still that one, I can just disable the optimization 
for Hive on Spark.

--

Given this query:
{code:sql}
explain
select key, value, count(key + 1) as agg1 from src 
group by key, value
order by key, value, agg1 limit 20;
{code}

Explain plan without patch:
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: src
            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
            Select Operator
              expressions: key (type: string), value (type: string), 
(UDFToDouble(key) + 1.0) (type: double)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
              Group By Operator
                aggregations: count(_col2)
                keys: _col0 (type: string), _col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string), _col1 
(type: string)
                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
                  value expressions: _col2 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string), KEY._col1 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE 
Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string), _col1 (type: string), 
_col2 (type: bigint)
              sort order: +++
              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE 
Column stats: NONE
              TopN Hash Memory Usage: 0.3
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 
(type: string), KEY.reducesinkkey2 (type: bigint)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE 
Column stats: NONE
          Limit
            Number of rows: 20
            Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE 
Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE 
Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 20
      Processor Tree:
        ListSink
{code}

Explain plan with patch:
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: src
            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
            Select Operator
              expressions: key (type: string), value (type: string), 
(UDFToDouble(key) + 1.0) (type: double)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
              Group By Operator
                aggregations: count(_col2)
                keys: _col0 (type: string), _col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE 
Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string), _col1 
(type: string)
                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
                  TopN Hash Memory Usage: 0.3
                  value expressions: _col2 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string), KEY._col1 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE 
Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string), _col1 (type: string), 
_col2 (type: bigint)
              sort order: +++
              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE 
Column stats: NONE
              TopN Hash Memory Usage: 0.3
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 
(type: string), KEY.reducesinkkey2 (type: bigint)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE 
Column stats: NONE
          Limit
            Number of rows: 20
            Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE 
Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE 
Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 20
      Processor Tree:
        ListSink
{code}

Observe the only difference is the TopN annotation in the map side RS, which 
prevents shuffling all the data.

> Extend limit propagation for chain of RS-GB-RS operators
> --------------------------------------------------------
>
>                 Key: HIVE-15474
>                 URL: https://issues.apache.org/jira/browse/HIVE-15474
>             Project: Hive
>          Issue Type: Bug
>          Components: Physical Optimizer
>    Affects Versions: 2.2.0
>            Reporter: Jesus Camacho Rodriguez
>            Assignee: Jesus Camacho Rodriguez
>         Attachments: HIVE-15474.patch
>
>
> The goal is to extend the work started in HIVE-14002.
> For instance, given the following query:
> {code:sql}
> explain
> select key, value, count(key + 1) as agg1 from src 
> group by key, value
> order by key, value, agg1 limit 20;
> {code}
> We can push the limit to the GBy operator. However, currently we do not do it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to