[ 
https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated HIVE-16600:
------------------------------------
    Attachment: HIVE-16600.4.patch

[~lirui]:  update HIVE-16600.4.patch
changes:
1. add {{-- SORT_QUERY_RESULTS}} to the test

About the question you mentioned
bq.But why we disable parallel order by when there's a limit is to avoid an 
extra stage (see this 
[comment|https://issues.apache.org/jira/browse/HIVE-10458?focusedCommentId=14539299&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14539299]).
 If the extra stage is needed anyway, then it makes no sense to disable 
parallel order by.

I guess [~xuefuz] means the common order by+limit case like following
{code}
set hive.mapred.mode=nonstrict;
set hive.exec.reducers.bytes.per.reducer=256;
set hive.optimize.sampling.orderby=true;
select key, value from src order by key limit 10;
{code}

the plan is:
{code}
 STAGE PLANS:
  Stage: Stage-1
    Spark
      Edges:
        Reducer 2 <- Map 1 (SORT, 1)
      DagName: root_20170515101633_2093e4cb-a060-452c-aa21-201132ea0819:1
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: src
                  Statistics: Num rows: 29 Data size: 5812 Basic stats: 
COMPLETE Column stats: NONE
                  GatherStats: false
                  Select Operator
                    expressions: key (type: string), value (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 29 Data size: 5812 Basic stats: 
COMPLETE Column stats: NONE
                    Reduce Output Operator
                      key expressions: _col0 (type: string)
                      null sort order: a
                      sort order: +
                      Statistics: Num rows: 29 Data size: 5812 Basic stats: 
COMPLETE Column stats: NONE
                      tag: -1
                      TopN: 10
                      TopN Hash Memory Usage: 0.1
                      value expressions: _col1 (type: string)
                      auto parallelism: false
            Path -> Alias:
              hdfs://bdpe41:8020/user/hive/warehouse/src [src]
            Path -> Partition:
              hdfs://bdpe41:8020/user/hive/warehouse/src 
                Partition
                  base file name: src
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  properties:
                    bucket_count -1
                    column.name.delimiter ,
                    columns key,value
                    columns.comments 'default','default'
                    columns.types string:string
                    file.inputformat org.apache.hadoop.mapred.TextInputFormat
                    file.outputformat 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    location hdfs://bdpe41:8020/user/hive/warehouse/src
                    name default.src
                    numFiles 1
                    numRows 0
                    rawDataSize 0
                    serialization.ddl struct src { string key, string value}
                    serialization.format 1
                    serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    totalSize 5812
                    transient_lastDdlTime 1493960133
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    properties:
                      bucket_count -1
                      column.name.delimiter ,
                      columns key,value
                      columns.comments 'default','default'
                      columns.types string:string
                      file.inputformat org.apache.hadoop.mapred.TextInputFormat
                      file.outputformat 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      location hdfs://bdpe41:8020/user/hive/warehouse/src
                      name default.src
                      numFiles 1
                      numRows 0
                      rawDataSize 0
                      serialization.ddl struct src { string key, string value}
                      serialization.format 1
                      serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      totalSize 5812
                      transient_lastDdlTime 1493960133
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: default.src
                  name: default.src
            Truncated Path -> Alias:
              /src [src]
        Reducer 2 
            Needs Tagging: false
            Reduce Operator Tree:
              Select Operator
                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 
(type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE 
Column stats: NONE
                Limit
                  Number of rows: 10
                  Statistics: Num rows: 10 Data size: 2000 Basic stats: 
COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
                    GlobalTableId: 0
                    directory: 
hdfs://bdpe41:8020/tmp/hive/root/92c15c56-8c37-4ec3-8547-a6912d0ad483/hive_2017-05-15_10-16-33_083_7741809362938040844-1/-mr-10001/.hive-staging_hive_2017-05-15_10-16-33_083_7741809362938040844-1/-ext-10002
                    NumFilesPerFileSink: 1
                    Statistics: Num rows: 10 Data size: 2000 Basic stats: 
COMPLETE Column stats: NONE
                    Stats Publishing Key Prefix: 
hdfs://bdpe41:8020/tmp/hive/root/92c15c56-8c37-4ec3-8547-a6912d0ad483/hive_2017-05-15_10-16-33_083_7741809362938040844-1/-mr-10001/.hive-staging_hive_2017-05-15_10-16-33_083_7741809362938040844-1/-ext-10002/
                    table:
                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        properties:
                          columns _col0,_col1
                          columns.types string:string
                          escape.delim \
                          hive.serialization.extend.additional.nesting.levels 
true
                          serialization.escape.crlf true
                          serialization.format 1
                          serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    TotalFiles: 1
                    GatherStats: false
                    MultiFileSpray: false

  Stage: Stage-0
    Fetch Operator
      limit: 10
      Processor Tree:
        ListSink
{code}

We can see that the parallelism of Sort is 1 even we enable parallel order 
by(set hive.optimize.sampling.orderby=true). This is because we use 
[rdd.sortByKey|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java#L51]
 to implement sort. If the parallelism of sort is not 1, order by+limit will be 
executed in different tasks. The result may not be correct( if we divide the 
input file into 10 subtasks to get the top1 of each file, and collect the 
result . The result may not the top 10 of the input file). 

> Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel 
> order by in multi_insert cases
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-16600
>                 URL: https://issues.apache.org/jira/browse/HIVE-16600
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>         Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, 
> HIVE-16600.3.patch, HIVE-16600.4.patch, mr.explain, mr.explain.log.HIVE-16600
>
>
> multi_insert_gby.case.q
> {code}
> set hive.exec.reducers.bytes.per.reducer=256;
> set hive.optimize.sampling.orderby=true;
> drop table if exists e1;
> drop table if exists e2;
> create table e1 (key string, value string);
> create table e2 (key string);
> FROM (select key, cast(key as double) as keyD, value from src order by key) a
> INSERT OVERWRITE TABLE e1
>     SELECT key, value
> INSERT OVERWRITE TABLE e2
>     SELECT key;
> select * from e1;
> select * from e2;
> {code} 
> the parallelism of Sort is 1 even we enable parallel order 
> by("hive.optimize.sampling.orderby" is set as "true").  This is not 
> reasonable because the parallelism  should be calcuated by  
> [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170]
> this is because SetSparkReducerParallelism#needSetParallelism returns false 
> when [children size of 
> RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]
>  is greater than 1.
> in this case, the children size of {{RS[2]}} is two.
> the logical plan of the case
> {code}
>    TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5]
>                             -SEL[6]-FS[7]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to