[ 
https://issues.apache.org/jira/browse/HIVE-22666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krisztian Kasa updated HIVE-22666:
----------------------------------
    Description: 
{code}
EXPLAIN EXTENDED
SELECT s_state, ranking
FROM (
 SELECT s_state AS s_state,
 rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
 FROM testtable_n1000) tmp1
 WHERE ranking <= 3;
{code}
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Tez
#### A masked pattern was here ####
      Edges:
        Reducer 2 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: testtable_n1000
                  Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE 
Column stats: COMPLETE
                  GatherStats: false
                  Reduce Output Operator
                    key expressions: s_state (type: string), ss_net_profit 
(type: double)
                    null sort order: az
                    sort order: ++
                    Map-reduce partition columns: s_state (type: string)
                    Statistics: Num rows: 10 Data size: 940 Basic stats: 
COMPLETE Column stats: COMPLETE
                    tag: -1
                    TopN: 4
                    TopN Hash Memory Usage: 0.1
                    auto parallelism: true
            Execution mode: vectorized, llap
            LLAP IO: no inputs
            Path -> Alias:
#### A masked pattern was here ####
            Path -> Partition:
#### A masked pattern was here ####
                Partition
                  base file name: testtable_n1000
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  properties:
                    COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
                    bucket_count -1
                    bucketing_version 2
                    column.name.delimiter ,
                    columns s_state,ss_net_profit
                    columns.comments 
                    columns.types string:double
#### A masked pattern was here ####
                    name default.testtable_n1000
                    numFiles 1
                    numRows 10
                    rawDataSize 80
                    serialization.ddl struct testtable_n1000 { string s_state, 
double ss_net_profit}
                    serialization.format 1
                    serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    totalSize 90
#### A masked pattern was here ####
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    properties:
                      COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
                      bucket_count -1
                      bucketing_version 2
                      column.name.delimiter ,
                      columns s_state,ss_net_profit
                      columns.comments 
                      columns.types string:double
#### A masked pattern was here ####
                      name default.testtable_n1000
                      numFiles 1
                      numRows 10
                      rawDataSize 80
                      serialization.ddl struct testtable_n1000 { string 
s_state, double ss_net_profit}
                      serialization.format 1
                      serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      totalSize 90
#### A masked pattern was here ####
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: default.testtable_n1000
                  name: default.testtable_n1000
            Truncated Path -> Alias:
              /testtable_n1000 [testtable_n1000]
        Reducer 2 
            Execution mode: vectorized, llap
            Needs Tagging: false
            Reduce Operator Tree:
              Select Operator
                expressions: KEY.reducesinkkey0 (type: string), 
KEY.reducesinkkey1 (type: double)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 10 Data size: 3620 Basic stats: COMPLETE 
Column stats: COMPLETE
                PTF Operator
                  Function definitions:
                      Input definition
                        input alias: ptf_0
                        output shape: _col0: string, _col1: double
                        type: WINDOWING
                      Windowing table definition
                        input alias: ptf_1
                        name: windowingtablefunction
                        order by: _col1 ASC NULLS LAST
                        partition by: _col0
                        raw input shape:
                        window functions:
                            window function definition
                              alias: rank_window_0
                              arguments: _col1
                              name: rank
                              window function: GenericUDAFRankEvaluator
                              window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
                              isPivotResult: true
                  Statistics: Num rows: 10 Data size: 3620 Basic stats: 
COMPLETE Column stats: COMPLETE
                  Filter Operator
                    isSamplingPred: false
                    predicate: (rank_window_0 <= 3) (type: boolean)
                    Statistics: Num rows: 3 Data size: 1086 Basic stats: 
COMPLETE Column stats: COMPLETE
                    Select Operator
                      expressions: _col0 (type: string), rank_window_0 (type: 
int)
                      outputColumnNames: _col0, _col1
                      Statistics: Num rows: 3 Data size: 270 Basic stats: 
COMPLETE Column stats: COMPLETE
                      File Output Operator
                        compressed: false
                        GlobalTableId: 0
#### A masked pattern was here ####
                        NumFilesPerFileSink: 1
                        Statistics: Num rows: 3 Data size: 270 Basic stats: 
COMPLETE Column stats: COMPLETE
#### A masked pattern was here ####
                        table:
                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                            properties:
                              columns _col0,_col1
                              columns.types string:int
                              escape.delim \
                              
hive.serialization.extend.additional.nesting.levels true
                              serialization.escape.crlf true
                              serialization.format 1
                              serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                        TotalFiles: 1
                        GatherStats: false
                        MultiFileSpray: false

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
{code}

In this case the topN value (3) will be pushed to the ReduceSink operator
https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257

ReduceSink operator uses PTFTopNHash to get the topN rows for each partition 
key (s_state) value.

The goals of this jira are:
- implement supporting partitioning in TopNKeyOperator
- enable push down of partitioned TopNKeyOperator


  was:
{code}
EXPLAIN EXTENDED
SELECT s_state, ranking
FROM (
 SELECT s_state AS s_state,
 rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
 FROM testtable_n1000) tmp1
 WHERE ranking <= 3;
{code}

In this case the topN value (3) will be pushed to the ReduceSink operator
https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257

ReduceSink operator uses PTFTopNHash to get the topN rows for each partition 
key (s_state) value.

The goals of this jira are:
- implement supporting partitioning in TopNKeyOperator
- enable push down of partitioned TopNKeyOperator



> Introduce TopNKey operator for PTF Reduce Sink
> ----------------------------------------------
>
>                 Key: HIVE-22666
>                 URL: https://issues.apache.org/jira/browse/HIVE-22666
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Krisztian Kasa
>            Priority: Major
>
> {code}
> EXPLAIN EXTENDED
> SELECT s_state, ranking
> FROM (
>  SELECT s_state AS s_state,
>  rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
>  FROM testtable_n1000) tmp1
>  WHERE ranking <= 3;
> {code}
> {code}
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-1
>     Tez
> #### A masked pattern was here ####
>       Edges:
>         Reducer 2 <- Map 1 (SIMPLE_EDGE)
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: testtable_n1000
>                   Statistics: Num rows: 10 Data size: 940 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                   GatherStats: false
>                   Reduce Output Operator
>                     key expressions: s_state (type: string), ss_net_profit 
> (type: double)
>                     null sort order: az
>                     sort order: ++
>                     Map-reduce partition columns: s_state (type: string)
>                     Statistics: Num rows: 10 Data size: 940 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     tag: -1
>                     TopN: 4
>                     TopN Hash Memory Usage: 0.1
>                     auto parallelism: true
>             Execution mode: vectorized, llap
>             LLAP IO: no inputs
>             Path -> Alias:
> #### A masked pattern was here ####
>             Path -> Partition:
> #### A masked pattern was here ####
>                 Partition
>                   base file name: testtable_n1000
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                   properties:
>                     COLUMN_STATS_ACCURATE 
> {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
>                     bucket_count -1
>                     bucketing_version 2
>                     column.name.delimiter ,
>                     columns s_state,ss_net_profit
>                     columns.comments 
>                     columns.types string:double
> #### A masked pattern was here ####
>                     name default.testtable_n1000
>                     numFiles 1
>                     numRows 10
>                     rawDataSize 80
>                     serialization.ddl struct testtable_n1000 { string 
> s_state, double ss_net_profit}
>                     serialization.format 1
>                     serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     totalSize 90
> #### A masked pattern was here ####
>                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                 
>                     input format: org.apache.hadoop.mapred.TextInputFormat
>                     output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                     properties:
>                       COLUMN_STATS_ACCURATE 
> {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
>                       bucket_count -1
>                       bucketing_version 2
>                       column.name.delimiter ,
>                       columns s_state,ss_net_profit
>                       columns.comments 
>                       columns.types string:double
> #### A masked pattern was here ####
>                       name default.testtable_n1000
>                       numFiles 1
>                       numRows 10
>                       rawDataSize 80
>                       serialization.ddl struct testtable_n1000 { string 
> s_state, double ss_net_profit}
>                       serialization.format 1
>                       serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                       totalSize 90
> #### A masked pattern was here ####
>                     serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     name: default.testtable_n1000
>                   name: default.testtable_n1000
>             Truncated Path -> Alias:
>               /testtable_n1000 [testtable_n1000]
>         Reducer 2 
>             Execution mode: vectorized, llap
>             Needs Tagging: false
>             Reduce Operator Tree:
>               Select Operator
>                 expressions: KEY.reducesinkkey0 (type: string), 
> KEY.reducesinkkey1 (type: double)
>                 outputColumnNames: _col0, _col1
>                 Statistics: Num rows: 10 Data size: 3620 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                 PTF Operator
>                   Function definitions:
>                       Input definition
>                         input alias: ptf_0
>                         output shape: _col0: string, _col1: double
>                         type: WINDOWING
>                       Windowing table definition
>                         input alias: ptf_1
>                         name: windowingtablefunction
>                         order by: _col1 ASC NULLS LAST
>                         partition by: _col0
>                         raw input shape:
>                         window functions:
>                             window function definition
>                               alias: rank_window_0
>                               arguments: _col1
>                               name: rank
>                               window function: GenericUDAFRankEvaluator
>                               window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
>                               isPivotResult: true
>                   Statistics: Num rows: 10 Data size: 3620 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                   Filter Operator
>                     isSamplingPred: false
>                     predicate: (rank_window_0 <= 3) (type: boolean)
>                     Statistics: Num rows: 3 Data size: 1086 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     Select Operator
>                       expressions: _col0 (type: string), rank_window_0 (type: 
> int)
>                       outputColumnNames: _col0, _col1
>                       Statistics: Num rows: 3 Data size: 270 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                       File Output Operator
>                         compressed: false
>                         GlobalTableId: 0
> #### A masked pattern was here ####
>                         NumFilesPerFileSink: 1
>                         Statistics: Num rows: 3 Data size: 270 Basic stats: 
> COMPLETE Column stats: COMPLETE
> #### A masked pattern was here ####
>                         table:
>                             input format: 
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                             output format: 
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                             properties:
>                               columns _col0,_col1
>                               columns.types string:int
>                               escape.delim \
>                               
> hive.serialization.extend.additional.nesting.levels true
>                               serialization.escape.crlf true
>                               serialization.format 1
>                               serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                             serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                         TotalFiles: 1
>                         GatherStats: false
>                         MultiFileSpray: false
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink
> {code}
> In this case the topN value (3) will be pushed to the ReduceSink operator
> https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257
> ReduceSink operator uses PTFTopNHash to get the topN rows for each partition 
> key (s_state) value.
> The goals of this jira are:
> - implement supporting partitioning in TopNKeyOperator
> - enable push down of partitioned TopNKeyOperator



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to