[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16011763#comment-16011763 ]
liyunzhang_intel commented on HIVE-16600: ----------------------------------------- [~lirui]: bq. For orderBy + Limit query, we have two choices: use multiple reducers to get global order, and then shuffle to get global limit. Or we can use single reducer to do the order and limit together. I think the latter is better because we don't actually need to get a global order. agree bq.In our multi insert example, it seems we cannot choose the latter. I suspect that's because there's only one limit. liyunzhang_intel, could you try adding limit to both inserts and see what happens? It seems that MR also uses extra shuffle in two limit case: {code} set hive.mapred.mode=nonstrict; set hive.exec.reducers.bytes.per.reducer=256; set hive.optimize.sampling.orderby=true; drop table if exists e1; drop table if exists e2; create table e1 (key string, value string); create table e2 (key string); FROM (select key,value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value limit 5 INSERT OVERWRITE TABLE e2 SELECT key limit 10; select * from e1; select * from e2; {code} explain {code} Stage-2 is a root stage [MAPRED] Stage-3 depends on stages: Stage-2 [MAPRED] Stage-0 depends on stages: Stage-3 [MOVE] Stage-4 depends on stages: Stage-0 [STATS] Stage-5 depends on stages: Stage-2 [MAPRED] Stage-1 depends on stages: Stage-5 [MOVE] Stage-6 depends on stages: Stage-1 [STATS] STAGE PLANS: Stage: Stage-2 Map Reduce Map Operator Tree: TableScan alias: src GatherStats: false Select Operator expressions: key (type: string), value (type: string) outputColumnNames: _col0, _col1 Reduce Output Operator key expressions: _col0 (type: string) null sort order: a sort order: + tag: -1 value expressions: _col1 (type: string) auto parallelism: false Path -> Alias: hdfs://bdpe41:8020/user/hive/warehouse/src [a:src] Path -> Partition: hdfs://bdpe41:8020/user/hive/warehouse/src Partition base file name: src input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src name: default.src Truncated Path -> Alias: /src [a:src] Needs Tagging: false Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) outputColumnNames: _col0, _col1 Limit Number of rows: 5 File Output Operator compressed: false GlobalTableId: 0 directory: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004 NumFilesPerFileSink: 1 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: column.name.delimiter , columns _col0,_col1 columns.types string,string escape.delim \ serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe TotalFiles: 1 GatherStats: false MultiFileSpray: false Select Operator expressions: _col0 (type: string) outputColumnNames: _col0 Limit Number of rows: 10 File Output Operator compressed: false GlobalTableId: 0 directory: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005 NumFilesPerFileSink: 1 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: column.name.delimiter , columns _col0 columns.types string escape.delim \ serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe TotalFiles: 1 GatherStats: false MultiFileSpray: false Stage: Stage-3 Map Reduce Map Operator Tree: TableScan GatherStats: false Reduce Output Operator null sort order: sort order: tag: -1 TopN: 5 TopN Hash Memory Usage: 0.1 value expressions: _col0 (type: string), _col1 (type: string) auto parallelism: false Path -> Alias: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004 [hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004] Path -> Partition: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004 Partition base file name: -mr-10004 input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: column.name.delimiter , columns _col0,_col1 columns.types string,string escape.delim \ serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: column.name.delimiter , columns _col0,_col1 columns.types string,string escape.delim \ serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Truncated Path -> Alias: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004 [hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004] Needs Tagging: false Reduce Operator Tree: Select Operator expressions: VALUE._col0 (type: string), VALUE._col1 (type: string) outputColumnNames: _col0, _col1 Limit Number of rows: 5 File Output Operator compressed: false GlobalTableId: 1 directory: hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000 NumFilesPerFileSink: 1 Stats Publishing Key Prefix: hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000/ table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} bucket_count -1 column.name.delimiter , columns key,value columns.comments columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/e1 name default.e1 numFiles 0 numRows 0 rawDataSize 0 serialization.ddl struct e1 { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 0 transient_lastDdlTime 1494911492 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e1 TotalFiles: 1 GatherStats: true MultiFileSpray: false Stage: Stage-0 Move Operator tables: replace: true source: hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} bucket_count -1 column.name.delimiter , columns key,value columns.comments columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/e1 name default.e1 numFiles 0 numRows 0 rawDataSize 0 serialization.ddl struct e1 { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 0 transient_lastDdlTime 1494911492 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e1 Stage: Stage-4 Stats-Aggr Operator Stats Aggregation Key Prefix: hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000/ Stage: Stage-5 Map Reduce Map Operator Tree: TableScan GatherStats: false Reduce Output Operator null sort order: sort order: tag: -1 TopN: 10 TopN Hash Memory Usage: 0.1 value expressions: _col0 (type: string) auto parallelism: false Path -> Alias: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005 [hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005] Path -> Partition: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005 Partition base file name: -mr-10005 input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: column.name.delimiter , columns _col0 columns.types string escape.delim \ serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat properties: column.name.delimiter , columns _col0 columns.types string escape.delim \ serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Truncated Path -> Alias: hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005 [hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005] Needs Tagging: false Reduce Operator Tree: Select Operator expressions: VALUE._col0 (type: string) outputColumnNames: _col0 Limit Number of rows: 10 File Output Operator compressed: false GlobalTableId: 2 directory: hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002 NumFilesPerFileSink: 1 Stats Publishing Key Prefix: hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002/ table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} bucket_count -1 column.name.delimiter , columns key columns.comments columns.types string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/e2 name default.e2 numFiles 0 numRows 0 rawDataSize 0 serialization.ddl struct e2 { string key} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 0 transient_lastDdlTime 1494911492 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e2 TotalFiles: 1 GatherStats: true MultiFileSpray: false Stage: Stage-1 Move Operator tables: replace: true source: hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} bucket_count -1 column.name.delimiter , columns key columns.comments columns.types string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/e2 name default.e2 numFiles 0 numRows 0 rawDataSize 0 serialization.ddl struct e2 { string key} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 0 transient_lastDdlTime 1494911492 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e2 Stage: Stage-6 Stats-Aggr Operator Stats Aggregation Key Prefix: hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002/ {code} bq.Ideally, we should be able to use single reducer if all the inserts have limit (with same or similar number perhaps). If not, we shouldn't disable parallel order by for multi insert. agree. so update this logic in HIVE-16600.5.patch. There is big difference between order by+limit and order by+ multi insert(contain limit). It should be in order in the former while may not in order in the latter. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > -------------------------------------------------------------------------------------------------------- > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, mr.explain, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} > TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)