“Thoughts on this approach?“
Just to warn you this is a hazardous optimization without cardinality
information. Removing columns from the hash exchange reduces entropy
potentially resulting in skew. Also keep in mind that if you reduce the number
of columns on one side of the join you need todo it on the other. This will
require you to rewrite EnsureRequirements or add a special case to detect this.
As a word of warning there’s a whole bunch of subtle things that
EnsureRequirements is doing and its really easy to unintentionally create
performance regressions while making improvements in other areas.
“Could someone help explain why the different join types have different output
partitionings“
Long story short when a join happens the join exec zips together the partitions
of the left and right side so that one partition of the join has the elements
of the left and right. In the case of an inner join this means that that the
resulting RDD is now partitioned by both the left join keys and the right join
keys. I’d suggest taking a look at the join execs and take a look at how they
build the result RDD from the partitions of the left and right RDDs.(see
doExecute(…)) left/right outer does look surprising though.
You should see something like…
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
Cheers Andrew
From: Brett Marcott <[email protected]>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "[email protected]" <[email protected]>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining
Hi all,
I found this jira for an issue I ran into recently:
https://issues.apache.org/jira/browse/SPARK-28771
My initial idea for a fix is to change SortMergeJoinExec's (and
ShuffledHashJoinExec) requiredChildDistribution.
At least if all below conditions are met, we could only require a subset of
keys for partitioning:
left and right children's output partitionings are hashpartitioning with same
numpartitions
left and right partition expressions have the same subset (with regards to
indices) of their respective join keys
If that subset of keys is returned by requiredChildDistribution, then
EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage,
hence reusing the children's partitioning.
1.Thoughts on this approach?
2. Could someone help explain why the different join types have different
output partitionings in
SortMergeJoinExec.outputPartitioning<https://github.com/apache/spark/blob/cdcd43cbf2479b258f4c5cfa0f6306f475d25cf2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L85-L96>?
Thanks,
Brett