Thanks Michael for the reply.
Below is the sql plan for 1.5 and 1.4. 1.5 is using SortMergeJoin, while 1.4.1 
is using shuffled hash join.

In this case, it seems hash join performs better than sort join.


===========================================================================================================================================
=1.5.0=====================================================================================================================================
===========================================================================================================================================
== Parsed Logical Plan ==
'Project 
[unresolvedalias('t1.ss_quantity),unresolvedalias('t1.ss_list_price),unresolvedalias('t1.ss_coupon_amt),unresolvedalias('t1.ss_cdemo_sk),unresolvedalias('t1.ss_item_sk),unresolvedalias('t1.ss_promo_sk),unresolvedalias('t1.ss_sold_date_sk)]
 'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179))
  'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk))
   'UnresolvedRelation [store_sales], Some(t1)
   'UnresolvedRelation [store_sales], Some(t2)

== Analyzed Logical Plan ==
ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2), 
ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int
Project 
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
 Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179))
  Join Inner, Some((ss_item_sk#48 = ss_item_sk#71))
   Subquery t1
    Subquery store_sales
     
Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
   Subquery t2
    Subquery store_sales
     
Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]

== Optimized Logical Plan ==
Project 
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
 Join Inner, Some((ss_item_sk#48 = ss_item_sk#71))
  Project 
[ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48]
   Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179))
    
Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
  Project [ss_item_sk#71]
   
Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91]
 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]

== Physical Plan ==
TungstenProject 
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
 SortMergeJoin [ss_item_sk#48], [ss_item_sk#71]
  TungstenSort [ss_item_sk#48 ASC], false, 0
   TungstenExchange hashpartitioning(ss_item_sk#48)
    ConvertToUnsafe
     Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48]
  TungstenSort [ss_item_sk#71 ASC], false, 0
   TungstenExchange hashpartitioning(ss_item_sk#71)
    ConvertToUnsafe
     Scan 
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#71]

Code Generation: true


===========================================================================================================================================
=1.4.1=====================================================================================================================================
===========================================================================================================================================
== Parsed Logical Plan ==
'Project 
['t1.ss_quantity,'t1.ss_list_price,'t1.ss_coupon_amt,'t1.ss_cdemo_sk,'t1.ss_item_sk,'t1.ss_promo_sk,'t1.ss_sold_date_sk]
 'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179))
  'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk))
   'UnresolvedRelation [store_sales], Some(t1)
   'UnresolvedRelation [store_sales], Some(t2)

== Analyzed Logical Plan ==
ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2), 
ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int
Project 
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
 Filter ((ss_sold_date_sk#0 >= 2450815) && (ss_sold_date_sk#0 <= 2451179))
  Join Inner, Some((ss_item_sk#2 = ss_item_sk#25))
   Subquery t1
    Subquery store_sales
     
Relation[ss_sold_date_sk#0,ss_sold_time_sk#1,ss_item_sk#2,ss_customer_sk#3,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_promo_sk#8,ss_ticket_number#9,ss_quantity#10,ss_wholesale_cost#11,ss_list_price#12,ss_sales_price#13,ss_ext_discount_amt#14,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_ext_list_price#17,ss_ext_tax#18,ss_coupon_amt#19,ss_net_paid#20,ss_net_paid_inc_tax#21,ss_net_profit#22]
 org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7
   Subquery t2
    Subquery store_sales
     
Relation[ss_sold_date_sk#23,ss_sold_time_sk#24,ss_item_sk#25,ss_customer_sk#26,ss_cdemo_sk#27,ss_hdemo_sk#28,ss_addr_sk#29,ss_store_sk#30,ss_promo_sk#31,ss_ticket_number#32,ss_quantity#33,ss_wholesale_cost#34,ss_list_price#35,ss_sales_price#36,ss_ext_discount_amt#37,ss_ext_sales_price#38,ss_ext_wholesale_cost#39,ss_ext_list_price#40,ss_ext_tax#41,ss_coupon_amt#42,ss_net_paid#43,ss_net_paid_inc_tax#44,ss_net_profit#45]
 org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7

== Optimized Logical Plan ==
Project 
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
 Join Inner, Some((ss_item_sk#2 = ss_item_sk#25))
  Project 
[ss_list_price#12,ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
   Filter ((ss_sold_date_sk#0 >= 2450815) && (ss_sold_date_sk#0 <= 2451179))
    
Relation[ss_sold_date_sk#0,ss_sold_time_sk#1,ss_item_sk#2,ss_customer_sk#3,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_promo_sk#8,ss_ticket_number#9,ss_quantity#10,ss_wholesale_cost#11,ss_list_price#12,ss_sales_price#13,ss_ext_discount_amt#14,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_ext_list_price#17,ss_ext_tax#18,ss_coupon_amt#19,ss_net_paid#20,ss_net_paid_inc_tax#21,ss_net_profit#22]
 org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7
  Project [ss_item_sk#25]
   
Relation[ss_sold_date_sk#23,ss_sold_time_sk#24,ss_item_sk#25,ss_customer_sk#26,ss_cdemo_sk#27,ss_hdemo_sk#28,ss_addr_sk#29,ss_store_sk#30,ss_promo_sk#31,ss_ticket_number#32,ss_quantity#33,ss_wholesale_cost#34,ss_list_price#35,ss_sales_price#36,ss_ext_discount_amt#37,ss_ext_sales_price#38,ss_ext_wholesale_cost#39,ss_ext_list_price#40,ss_ext_tax#41,ss_coupon_amt#42,ss_net_paid#43,ss_net_paid_inc_tax#44,ss_net_profit#45]
 org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7

== Physical Plan ==
Project 
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
 ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
  Exchange (HashPartitioning 600)
   PhysicalRDD 
[ss_list_price#12,ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0],
 UnionRDD[1093] at rdd at <console>:45
  Exchange (HashPartitioning 600)
   PhysicalRDD [ss_item_sk#25], UnionRDD[5108] at rdd at <console>:45

Code Generation: false






























At 2015-09-11 02:02:45, "Michael Armbrust" <mich...@databricks.com> wrote:

I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so 
this is surprising.  In my experiments Spark 1.5 is either the same or faster 
than 1.4 with only small exceptions.  A few thoughts,


 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever 
reporting performance changes.


On Thu, Sep 10, 2015 at 1:24 AM, Todd <bit1...@163.com> wrote:

Hi,

I am using data generated with 
sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark 
sql performance (spark on yarn, with 10 nodes) with the following code (The 
table store_sales is about 90 million records, 6G in size)
 
val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
val name="store_sales"
    sqlContext.sql(
      s"""
          |CREATE TEMPORARY TABLE ${name}
          |USING org.apache.spark.sql.parquet
          |OPTIONS (
          |  path '${outputDir}'
          |)
        """.stripMargin)
        
val sql="""
         |select
         |  t1.ss_quantity,
         |  t1.ss_list_price,
         |  t1.ss_coupon_amt,
         |  t1.ss_cdemo_sk,
         |  t1.ss_item_sk,
         |  t1.ss_promo_sk,
         |  t1.ss_sold_date_sk
         |from store_sales t1 join store_sales t2 on t1.ss_item_sk = 
t2.ss_item_sk
         |where
         |  t1.ss_sold_date_sk between 2450815 and 2451179
       """.stripMargin
        
val df = sqlContext.sql(sql)
df.rdd.foreach(row=>Unit)

With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes with 
1.5.

The configuration are basically the same, since I copy the configuration from 
1.4.1 to 1.5:

sparkVersion    1.4.1        1.5.0
scaleFactor    30        30
spark.sql.shuffle.partitions    600        600
spark.sql.sources.partitionDiscovery.enabled    true        true
spark.default.parallelism    200        200
spark.driver.memory    4G    4G        4G
spark.executor.memory    4G        4G
spark.executor.instances    10        10
spark.shuffle.consolidateFiles    true        true
spark.storage.memoryFraction    0.4        0.4
spark.executor.cores    3        3

I am not sure where is going wrong,any ideas?




Reply via email to