Thanks Michael for the reply.
Below is the sql plan for 1.5 and 1.4. 1.5 is using SortMergeJoin, while 1.4.1
is using shuffled hash join.
In this case, it seems hash join performs better than sort join.
===========================================================================================================================================
=1.5.0=====================================================================================================================================
===========================================================================================================================================
== Parsed Logical Plan ==
'Project
[unresolvedalias('t1.ss_quantity),unresolvedalias('t1.ss_list_price),unresolvedalias('t1.ss_coupon_amt),unresolvedalias('t1.ss_cdemo_sk),unresolvedalias('t1.ss_item_sk),unresolvedalias('t1.ss_promo_sk),unresolvedalias('t1.ss_sold_date_sk)]
'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179))
'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk))
'UnresolvedRelation [store_sales], Some(t1)
'UnresolvedRelation [store_sales], Some(t2)
== Analyzed Logical Plan ==
ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2),
ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int
Project
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179))
Join Inner, Some((ss_item_sk#48 = ss_item_sk#71))
Subquery t1
Subquery store_sales
Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68]
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
Subquery t2
Subquery store_sales
Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91]
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
== Optimized Logical Plan ==
Project
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
Join Inner, Some((ss_item_sk#48 = ss_item_sk#71))
Project
[ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48]
Filter ((ss_sold_date_sk#46 >= 2450815) && (ss_sold_date_sk#46 <= 2451179))
Relation[ss_sold_date_sk#46,ss_sold_time_sk#47,ss_item_sk#48,ss_customer_sk#49,ss_cdemo_sk#50,ss_hdemo_sk#51,ss_addr_sk#52,ss_store_sk#53,ss_promo_sk#54,ss_ticket_number#55,ss_quantity#56,ss_wholesale_cost#57,ss_list_price#58,ss_sales_price#59,ss_ext_discount_amt#60,ss_ext_sales_price#61,ss_ext_wholesale_cost#62,ss_ext_list_price#63,ss_ext_tax#64,ss_coupon_amt#65,ss_net_paid#66,ss_net_paid_inc_tax#67,ss_net_profit#68]
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
Project [ss_item_sk#71]
Relation[ss_sold_date_sk#69,ss_sold_time_sk#70,ss_item_sk#71,ss_customer_sk#72,ss_cdemo_sk#73,ss_hdemo_sk#74,ss_addr_sk#75,ss_store_sk#76,ss_promo_sk#77,ss_ticket_number#78,ss_quantity#79,ss_wholesale_cost#80,ss_list_price#81,ss_sales_price#82,ss_ext_discount_amt#83,ss_ext_sales_price#84,ss_ext_wholesale_cost#85,ss_ext_list_price#86,ss_ext_tax#87,ss_coupon_amt#88,ss_net_paid#89,ss_net_paid_inc_tax#90,ss_net_profit#91]
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales]
== Physical Plan ==
TungstenProject
[ss_quantity#56,ss_list_price#58,ss_coupon_amt#65,ss_cdemo_sk#50,ss_item_sk#48,ss_promo_sk#54,ss_sold_date_sk#46]
SortMergeJoin [ss_item_sk#48], [ss_item_sk#71]
TungstenSort [ss_item_sk#48 ASC], false, 0
TungstenExchange hashpartitioning(ss_item_sk#48)
ConvertToUnsafe
Scan
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_cdemo_sk#50,ss_promo_sk#54,ss_coupon_amt#65,ss_list_price#58,ss_sold_date_sk#46,ss_quantity#56,ss_item_sk#48]
TungstenSort [ss_item_sk#71 ASC], false, 0
TungstenExchange hashpartitioning(ss_item_sk#71)
ConvertToUnsafe
Scan
ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#71]
Code Generation: true
===========================================================================================================================================
=1.4.1=====================================================================================================================================
===========================================================================================================================================
== Parsed Logical Plan ==
'Project
['t1.ss_quantity,'t1.ss_list_price,'t1.ss_coupon_amt,'t1.ss_cdemo_sk,'t1.ss_item_sk,'t1.ss_promo_sk,'t1.ss_sold_date_sk]
'Filter (('t1.ss_sold_date_sk >= 2450815) && ('t1.ss_sold_date_sk <= 2451179))
'Join Inner, Some(('t1.ss_item_sk = 't2.ss_item_sk))
'UnresolvedRelation [store_sales], Some(t1)
'UnresolvedRelation [store_sales], Some(t2)
== Analyzed Logical Plan ==
ss_quantity: int, ss_list_price: decimal(7,2), ss_coupon_amt: decimal(7,2),
ss_cdemo_sk: int, ss_item_sk: int, ss_promo_sk: int, ss_sold_date_sk: int
Project
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
Filter ((ss_sold_date_sk#0 >= 2450815) && (ss_sold_date_sk#0 <= 2451179))
Join Inner, Some((ss_item_sk#2 = ss_item_sk#25))
Subquery t1
Subquery store_sales
Relation[ss_sold_date_sk#0,ss_sold_time_sk#1,ss_item_sk#2,ss_customer_sk#3,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_promo_sk#8,ss_ticket_number#9,ss_quantity#10,ss_wholesale_cost#11,ss_list_price#12,ss_sales_price#13,ss_ext_discount_amt#14,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_ext_list_price#17,ss_ext_tax#18,ss_coupon_amt#19,ss_net_paid#20,ss_net_paid_inc_tax#21,ss_net_profit#22]
org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7
Subquery t2
Subquery store_sales
Relation[ss_sold_date_sk#23,ss_sold_time_sk#24,ss_item_sk#25,ss_customer_sk#26,ss_cdemo_sk#27,ss_hdemo_sk#28,ss_addr_sk#29,ss_store_sk#30,ss_promo_sk#31,ss_ticket_number#32,ss_quantity#33,ss_wholesale_cost#34,ss_list_price#35,ss_sales_price#36,ss_ext_discount_amt#37,ss_ext_sales_price#38,ss_ext_wholesale_cost#39,ss_ext_list_price#40,ss_ext_tax#41,ss_coupon_amt#42,ss_net_paid#43,ss_net_paid_inc_tax#44,ss_net_profit#45]
org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7
== Optimized Logical Plan ==
Project
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
Join Inner, Some((ss_item_sk#2 = ss_item_sk#25))
Project
[ss_list_price#12,ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
Filter ((ss_sold_date_sk#0 >= 2450815) && (ss_sold_date_sk#0 <= 2451179))
Relation[ss_sold_date_sk#0,ss_sold_time_sk#1,ss_item_sk#2,ss_customer_sk#3,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_promo_sk#8,ss_ticket_number#9,ss_quantity#10,ss_wholesale_cost#11,ss_list_price#12,ss_sales_price#13,ss_ext_discount_amt#14,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_ext_list_price#17,ss_ext_tax#18,ss_coupon_amt#19,ss_net_paid#20,ss_net_paid_inc_tax#21,ss_net_profit#22]
org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7
Project [ss_item_sk#25]
Relation[ss_sold_date_sk#23,ss_sold_time_sk#24,ss_item_sk#25,ss_customer_sk#26,ss_cdemo_sk#27,ss_hdemo_sk#28,ss_addr_sk#29,ss_store_sk#30,ss_promo_sk#31,ss_ticket_number#32,ss_quantity#33,ss_wholesale_cost#34,ss_list_price#35,ss_sales_price#36,ss_ext_discount_amt#37,ss_ext_sales_price#38,ss_ext_wholesale_cost#39,ss_ext_list_price#40,ss_ext_tax#41,ss_coupon_amt#42,ss_net_paid#43,ss_net_paid_inc_tax#44,ss_net_profit#45]
org.apache.spark.sql.parquet.ParquetRelation2@df0ab6a7
== Physical Plan ==
Project
[ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
Exchange (HashPartitioning 600)
PhysicalRDD
[ss_list_price#12,ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0],
UnionRDD[1093] at rdd at <console>:45
Exchange (HashPartitioning 600)
PhysicalRDD [ss_item_sk#25], UnionRDD[5108] at rdd at <console>:45
Code Generation: false
At 2015-09-11 02:02:45, "Michael Armbrust" <[email protected]> wrote:
I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3, so
this is surprising. In my experiments Spark 1.5 is either the same or faster
than 1.4 with only small exceptions. A few thoughts,
- 600 partitions is probably way too many for 6G of data.
- Providing the output of explain for both runs would be helpful whenever
reporting performance changes.
On Thu, Sep 10, 2015 at 1:24 AM, Todd <[email protected]> wrote:
Hi,
I am using data generated with
sparksqlperf(https://github.com/databricks/spark-sql-perf) to test the spark
sql performance (spark on yarn, with 10 nodes) with the following code (The
table store_sales is about 90 million records, 6G in size)
val outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
val name="store_sales"
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE ${name}
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${outputDir}'
|)
""".stripMargin)
val sql="""
|select
| t1.ss_quantity,
| t1.ss_list_price,
| t1.ss_coupon_amt,
| t1.ss_cdemo_sk,
| t1.ss_item_sk,
| t1.ss_promo_sk,
| t1.ss_sold_date_sk
|from store_sales t1 join store_sales t2 on t1.ss_item_sk =
t2.ss_item_sk
|where
| t1.ss_sold_date_sk between 2450815 and 2451179
""".stripMargin
val df = sqlContext.sql(sql)
df.rdd.foreach(row=>Unit)
With 1.4.1, I can finish the query in 6 minutes, but I need 10+ minutes with
1.5.
The configuration are basically the same, since I copy the configuration from
1.4.1 to 1.5:
sparkVersion 1.4.1 1.5.0
scaleFactor 30 30
spark.sql.shuffle.partitions 600 600
spark.sql.sources.partitionDiscovery.enabled true true
spark.default.parallelism 200 200
spark.driver.memory 4G 4G 4G
spark.executor.memory 4G 4G
spark.executor.instances 10 10
spark.shuffle.consolidateFiles true true
spark.storage.memoryFraction 0.4 0.4
spark.executor.cores 3 3
I am not sure where is going wrong,any ideas?