Hi,
I'm trying to understand SortMergeJoin (SPARK-2213).
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
example, in the code below, the two datasets have different number of
partitions, but it still does a SortMerge join after a "hashpartitioning".
CODE:
val sparkConf = new SparkConf()
.setAppName("SortMergeJoinTest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.eventLog.enabled", "true")
.set("spark.sql.planner.sortMergeJoin","true")
sparkConf.setMaster("local-cluster[3,1,1024]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val inputpath = input.gz.parquet
val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()
OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
TungstenExchange hashpartitioning(foo#0)
ConvertToUnsafe
Repartition 3, true
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
TungstenExchange hashpartitioning(foo2#8)
TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]
2) If both datasets have already been previously partitioned/sorted the
same and stored on the file system (e.g. in a previous job), is there a way
to tell Spark this so that it won't want to do a "hashpartitioning" on
them? It looks like Spark just considers datasets that have been just read
from the the file system to have UnknownPartitioning. In the example below,
I try to join a dataframe to itself, and it still wants to hash repartition.
CODE:
...
val df1 = sqlContext.read.parquet(inputpath)
val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()
OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#4]
TungstenSort [foo#0 ASC], false, 0
TungstenExchange hashpartitioning(foo#0)
ConvertToUnsafe
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#4 ASC], false, 0
TungstenExchange hashpartitioning(foo2#4)
ConvertToUnsafe
Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
Scan
ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]
Thanks.