[ https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041829#comment-16041829 ]
Xuefu Zhang commented on HIVE-16840: ------------------------------------ [~kellyzly], I think you're right and I was confused: sortByKey does produce global order. The reason for that you gave in the description is also accurate. Looking back at your proposals, #1 is similar to select in a subquery that outputs ordered data. It depends on SELECT following FETCH FIRST semantics. I'm not sure if this is reliable. The proposal #2 seems more plausible except this zipWithIndex(), which could be expensive. Maybe we can do a combination of the two: First, we do parallel sort (with N partitions) but filter out rows other than first M (M is the limit), followed by another sort (with 1 partition) with limit of M. This way, one task will sort only MxN rows, which should be fast if both MxN is small. Basically we will do this: {code} val composite1 = sc.parallelize(1 to 200, 10).map(p=>(1-p,p)).sortByKey(N).filter(first M).sortByKey(1).take(M) {code} Could you please check if this is possible? > Investigate the performance of order by limit in HoS > ---------------------------------------------------- > > Key: HIVE-16840 > URL: https://issues.apache.org/jira/browse/HIVE-16840 > Project: Hive > Issue Type: Bug > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > > We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged. > {code} > select i_item_id > ,i_item_desc > ,s_state > ,count(ss_quantity) as store_sales_quantitycount > ,avg(ss_quantity) as store_sales_quantityave > ,stddev_samp(ss_quantity) as store_sales_quantitystdev > ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov > ,count(sr_return_quantity) as_store_returns_quantitycount > ,avg(sr_return_quantity) as_store_returns_quantityave > ,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev > ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as > store_returns_quantitycov > ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) > as catalog_sales_quantityave > ,stddev_samp(cs_quantity)/avg(cs_quantity) as > catalog_sales_quantitystdev > ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov > from store_sales > ,store_returns > ,catalog_sales > ,date_dim d1 > ,date_dim d2 > ,date_dim d3 > ,store > ,item > where d1.d_quarter_name = '2000Q1' > and d1.d_date_sk = store_sales.ss_sold_date_sk > and item.i_item_sk = store_sales.ss_item_sk > and store.s_store_sk = store_sales.ss_store_sk > and store_sales.ss_customer_sk = store_returns.sr_customer_sk > and store_sales.ss_item_sk = store_returns.sr_item_sk > and store_sales.ss_ticket_number = store_returns.sr_ticket_number > and store_returns.sr_returned_date_sk = d2.d_date_sk > and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3') > and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk > and store_returns.sr_item_sk = catalog_sales.cs_item_sk > and catalog_sales.cs_sold_date_sk = d3.d_date_sk > and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3') > group by i_item_id > ,i_item_desc > ,s_state > order by i_item_id > ,i_item_desc > ,s_state > limit 100; > {code} > the reason why the script hanged is because we only use 1 task to implement > sort. > {code} > STAGE PLANS: > Stage: Stage-1 > Spark > Edges: > Reducer 10 <- Reducer 9 (SORT, 1) > Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 > (PARTITION-LEVEL SORT, 889) > Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 > (PARTITION-LEVEL SORT, 1009) > Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 > (PARTITION-LEVEL SORT, 683) > Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 > (PARTITION-LEVEL SORT, 751) > Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 > (PARTITION-LEVEL SORT, 826) > Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 > (PARTITION-LEVEL SORT, 909) > Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 > (PARTITION-LEVEL SORT, 1001) > Reducer 9 <- Reducer 8 (GROUP, 2) > {code} > The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 > task to execute to ensure the correctness. But the performance is poor. > the reason why we use 1 task to implement order by limit is > [here|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] -- This message was sent by Atlassian JIRA (v6.3.15#6346)