[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liyunzhang_intel updated HIVE-16600: ------------------------------------ Attachment: HIVE-16600.4.patch [~lirui]: update HIVE-16600.4.patch changes: 1. add {{-- SORT_QUERY_RESULTS}} to the test About the question you mentioned bq.But why we disable parallel order by when there's a limit is to avoid an extra stage (see this [comment|https://issues.apache.org/jira/browse/HIVE-10458?focusedCommentId=14539299&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14539299]). If the extra stage is needed anyway, then it makes no sense to disable parallel order by. I guess [~xuefuz] means the common order by+limit case like following {code} set hive.mapred.mode=nonstrict; set hive.exec.reducers.bytes.per.reducer=256; set hive.optimize.sampling.orderby=true; select key, value from src order by key limit 10; {code} the plan is: {code} STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (SORT, 1) DagName: root_20170515101633_2093e4cb-a060-452c-aa21-201132ea0819:1 Vertices: Map 1 Map Operator Tree: TableScan alias: src Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE GatherStats: false Select Operator expressions: key (type: string), value (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) null sort order: a sort order: + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE tag: -1 TopN: 10 TopN Hash Memory Usage: 0.1 value expressions: _col1 (type: string) auto parallelism: false Path -> Alias: hdfs://bdpe41:8020/user/hive/warehouse/src [src] Path -> Partition: hdfs://bdpe41:8020/user/hive/warehouse/src Partition base file name: src input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src name: default.src Truncated Path -> Alias: /src [src] Reducer 2 Needs Tagging: false Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 10 Statistics: Num rows: 10 Data size: 2000 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false GlobalTableId: 0 directory: hdfs://bdpe41:8020/tmp/hive/root/92c15c56-8c37-4ec3-8547-a6912d0ad483/hive_2017-05-15_10-16-33_083_7741809362938040844-1/-mr-10001/.hive-staging_hive_2017-05-15_10-16-33_083_7741809362938040844-1/-ext-10002 NumFilesPerFileSink: 1 Statistics: Num rows: 10 Data size: 2000 Basic stats: COMPLETE Column stats: NONE Stats Publishing Key Prefix: hdfs://bdpe41:8020/tmp/hive/root/92c15c56-8c37-4ec3-8547-a6912d0ad483/hive_2017-05-15_10-16-33_083_7741809362938040844-1/-mr-10001/.hive-staging_hive_2017-05-15_10-16-33_083_7741809362938040844-1/-ext-10002/ table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: columns _col0,_col1 columns.types string:string escape.delim \ hive.serialization.extend.additional.nesting.levels true serialization.escape.crlf true serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe TotalFiles: 1 GatherStats: false MultiFileSpray: false Stage: Stage-0 Fetch Operator limit: 10 Processor Tree: ListSink {code} We can see that the parallelism of Sort is 1 even we enable parallel order by(set hive.optimize.sampling.orderby=true). This is because we use [rdd.sortByKey|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java#L51] to implement sort. If the parallelism of sort is not 1, order by+limit will be executed in different tasks. The result may not be correct( if we divide the input file into 10 subtasks to get the top1 of each file, and collect the result . The result may not the top 10 of the input file). > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > -------------------------------------------------------------------------------------------------------- > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, mr.explain, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} > TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)