1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
example, in the code below, the two datasets have different number of
partitions, but it still does a SortMerge join after a "hashpartitioning".
[Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
requires the records with the identical join keys MUST BE shuffled to the same
“reducer” node / task, hashpartitioning is just a strategy to tell spark
shuffle service how to achieve that, in theory, we even can use the
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
about the shuffle strategy so much if it satisfies the demand on data
distribution.
2) If both datasets have already been previously partitioned/sorted the same
and stored on the file system (e.g. in a previous job), is there a way to tell
Spark this so that it won't want to do a "hashpartitioning" on them? It looks
like Spark just considers datasets that have been just read from the the file
system to have UnknownPartitioning. In the example below, I try to join a
dataframe to itself, and it still wants to hash repartition.
[Hao:] Take this as example:
EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key
JOIN src c ON b.key=c.key
== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
TungstenSort [key#21 ASC], false, 0
TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
TungstenSort [key#19 ASC], false, 0
TungstenExchange hashpartitioning(key#19,200)
ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
Some(a))
TungstenSort [key#21 ASC], false, 0
TungstenExchange hashpartitioning(key#21,200)
ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
Some(b))
TungstenSort [key#23 ASC], false, 0
TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))
There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can
join another table “JOIN src c ON b.key=c.key” directly, which only require the
table “c” for repartitioning on “key”.
Taking the file system based data source as “UnknownPartitioning”, will be a
simple and SAFE way for JOIN, as it’s hard to guarantee the records from
different data sets with the identical join keys will be loaded by the same
node/task , since lots of factors need to be considered, like task pool size,
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in
Hive, it is called bucket join. I am not sure will that happens soon in Spark
SQL.
Hao
From: Alex Nastetsky [mailto:[email protected]]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join
Hi,
I'm trying to understand SortMergeJoin (SPARK-2213).
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
example, in the code below, the two datasets have different number of
partitions, but it still does a SortMerge join after a "hashpartitioning".
CODE:
val sparkConf = new SparkConf()
.setAppName("SortMergeJoinTest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.eventLog.enabled", "true")
.set("spark.sql.planner.sortMergeJoin","true")
sparkConf.setMaster("local-cluster[3,1,1024]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val inputpath = input.gz.parquet
val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()
OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
TungstenExchange hashpartitioning(foo#0)
ConvertToUnsafe
Repartition 3, true
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
TungstenExchange hashpartitioning(foo2#8)
TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]
2) If both datasets have already been previously partitioned/sorted the same
and stored on the file system (e.g. in a previous job), is there a way to tell
Spark this so that it won't want to do a "hashpartitioning" on them? It looks
like Spark just considers datasets that have been just read from the the file
system to have UnknownPartitioning. In the example below, I try to join a
dataframe to itself, and it still wants to hash repartition.
CODE:
...
val df1 = sqlContext.read.parquet(inputpath)
val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()
OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#4]
TungstenSort [foo#0 ASC], false, 0
TungstenExchange hashpartitioning(foo#0)
ConvertToUnsafe
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#4 ASC], false, 0
TungstenExchange hashpartitioning(foo2#4)
ConvertToUnsafe
Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
Scan
ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]
Thanks.