Thanks Michael for the reply. Below is the sql plan for 1.5 and 1.4. 1.5 is using SortMergeJoin, while 1.4.1 is using shuffled hash join.
In this case, it seems hash join performs better than sort join. =========================================================================================================================================== =1.5.0===================================================================================================================================== =========================================================================================================================================== == Parsed Logical Plan == 'Project [unresolvedalias('t1.ss_quantity),unresolvedalias('t1.ss_list_price),unresolvedalias('t1.ss_coupon_amt),unresolvedalias('t1.ss_cdemo_sk),unresolvedalias('t1.ss_item_sk),unresolvedalias('t1.ss_promo_sk),unresolvedalias('t1.ss_sold_date_sk)] 'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179)) 'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk)) 'UnresolvedRelation [store_sales], Some(t1) 'UnresolvedRelation [store_sales], Some(t2) == Analyzed Logical Plan == ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2), ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int Project [ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46] Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179)) Join Inner, Some((ss_item_sk#48 = ss_item_sk#71)) Subquery t1 Subquery store_sales Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68] ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales] Subquery t2 Subquery store_sales Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91] ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales] == Optimized Logical Plan == Project [ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46] Join Inner, Some((ss_item_sk#48 = ss_item_sk#71)) Project [ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48] Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179)) Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68] ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales] Project [ss_item_sk#71] Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91] ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales] == Physical Plan == TungstenProject [ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46] SortMergeJoin [ss_item_sk#48], [ss_item_sk#71] TungstenSort [ss_item_sk#48 ASC], false, 0 TungstenExchange hashpartitioning(ss_item_sk#48) ConvertToUnsafe Scan ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48] TungstenSort [ss_item_sk#71 ASC], false, 0 TungstenExchange hashpartitioning(ss_item_sk#71) ConvertToUnsafe Scan ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#71] Code Generation: true =========================================================================================================================================== =1.4.1===================================================================================================================================== =========================================================================================================================================== == Parsed Logical Plan == 'Project ['t1.ss_quantity,'t1.ss_list_price,'t1.ss_coupon_amt,'t1.ss_cdemo_sk,'t1.ss_item_sk,'t1.ss_promo_sk,'t1.ss_sold_date_sk] 'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179)) 'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk)) 'UnresolvedRelation [store_sales], Some(t1) 'UnresolvedRelation [store_sales], Some(t2) == Analyzed Logical Plan == ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2), ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int Project [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0] Filter ((ss_sold_date_sk#0 >= 2450815) && (ss_sold_date_sk#0 <= 2451179)) Join Inner, Some((ss_item_sk#2 = ss_item_sk#25)) Subquery t1 Subquery store_sales Relation[ss_sold_date_sk#0,ss_sold_time_sk#1,ss_item_sk#2,ss_customer_sk#3,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_promo_sk#8,ss_ticket_number#9,ss_quantity#10,ss_wholesale_cost#11,ss_list_price#12,ss_sales_price#13,ss_ext_discount_amt#14,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_ext_list_price#17,ss_ext_tax#18,ss_coupon_amt#19,ss_net_paid#20,ss_net_paid_inc_tax#21,ss_net_profit#22] org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7 Subquery t2 Subquery store_sales Relation[ss_sold_date_sk#23,ss_sold_time_sk#24,ss_item_sk#25,ss_customer_sk#26,ss_cdemo_sk#27,ss_hdemo_sk#28,ss_addr_sk#29,ss_store_sk#30,ss_promo_sk#31,ss_ticket_number#32,ss_quantity#33,ss_wholesale_cost#34,ss_list_price#35,ss_sales_price#36,ss_ext_discount_amt#37,ss_ext_sales_price#38,ss_ext_wholesale_cost#39,ss_ext_list_price#40,ss_ext_tax#41,ss_coupon_amt#42,ss_net_paid#43,ss_net_paid_inc_tax#44,ss_net_profit#45] org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7 == Optimized Logical Plan == Project [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0] Join Inner, Some((ss_item_sk#2 = ss_item_sk#25)) Project [ss_list_price#12,ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0] Filter ((ss_sold_date_sk#0 >= 2450815) && (ss_sold_date_sk#0 <= 2451179)) Relation[ss_sold_date_sk#0,ss_sold_time_sk#1,ss_item_sk#2,ss_customer_sk#3,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_promo_sk#8,ss_ticket_number#9,ss_quantity#10,ss_wholesale_cost#11,ss_list_price#12,ss_sales_price#13,ss_ext_discount_amt#14,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_ext_list_price#17,ss_ext_tax#18,ss_coupon_amt#19,ss_net_paid#20,ss_net_paid_inc_tax#21,ss_net_profit#22] org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7 Project [ss_item_sk#25] Relation[ss_sold_date_sk#23,ss_sold_time_sk#24,ss_item_sk#25,ss_customer_sk#26,ss_cdemo_sk#27,ss_hdemo_sk#28,ss_addr_sk#29,ss_store_sk#30,ss_promo_sk#31,ss_ticket_number#32,ss_quantity#33,ss_wholesale_cost#34,ss_list_price#35,ss_sales_price#36,ss_ext_discount_amt#37,ss_ext_sales_price#38,ss_ext_wholesale_cost#39,ss_ext_list_price#40,ss_ext_tax#41,ss_coupon_amt#42,ss_net_paid#43,ss_net_paid_inc_tax#44,ss_net_profit#45] org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7 == Physical Plan == Project [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0] ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight Exchange (HashPartitioning 600) PhysicalRDD [ss_list_price#12,ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0], UnionRDD[1093] at rdd at <console>:45 Exchange (HashPartitioning 600) PhysicalRDD [ss_item_sk#25], UnionRDD[5108] at rdd at <console>:45 Code Generation: false At 2015-09-11 02:02:45, "Michael Armbrust" <mich...@databricks.com> wrote: I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so this is surprising. In my experiments Spark 1.5 is either the same or faster than 1.4 with only small exceptions. A few thoughts, - 600 partitions is probably way too many for 6G of data. - Providing the output of explain for both runs would be helpful whenever reporting performance changes. On Thu, Sep 10, 2015 at 1:24 AM, Todd <bit1...@163.com> wrote: Hi, I am using data generated with sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark sql performance (spark on yarn, with 10 nodes) with the following code (The table store_sales is about 90 million records, 6G in size) val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales" val name="store_sales" sqlContext.sql( s""" |CREATE TEMPORARY TABLE ${name} |USING org.apache.spark.sql.parquet |OPTIONS ( | path '${outputDir}' |) """.stripMargin) val sql=""" |select | t1.ss_quantity, | t1.ss_list_price, | t1.ss_coupon_amt, | t1.ss_cdemo_sk, | t1.ss_item_sk, | t1.ss_promo_sk, | t1.ss_sold_date_sk |from store_sales t1 join store_sales t2 on t1.ss_item_sk = t2.ss_item_sk |where | t1.ss_sold_date_sk between 2450815 and 2451179 """.stripMargin val df = sqlContext.sql(sql) df.rdd.foreach(row=>Unit) With 1.4.1, I can finish the query in 6 minutes, but I need 10+ minutes with 1.5. The configuration are basically the same, since I copy the configuration from 1.4.1 to 1.5: sparkVersion 1.4.1 1.5.0 scaleFactor 30 30 spark.sql.shuffle.partitions 600 600 spark.sql.sources.partitionDiscovery.enabled true true spark.default.parallelism 200 200 spark.driver.memory 4G 4G 4G spark.executor.memory 4G 4G spark.executor.instances 10 10 spark.shuffle.consolidateFiles true true spark.storage.memoryFraction 0.4 0.4 spark.executor.cores 3 3 I am not sure where is going wrong,any ideas?