1. Where can I find information on how to run standard performance tests/benchmarks? 2. Are performance degradations to existing queries that are fixable by new equivalent queries not allowed for a new major spark version?
On Thu, Jan 2, 2020 at 3:05 PM Brett Marcott <brett.marc...@gmail.com> wrote: > Thanks for the response Andrew. > > *1. The approach* > The approach I mentioned will not introduce any new skew, so it should > only be worsen performance if the user was relying on the shuffle to fix > skew they had before. > The user can address this by either not introducing their own skewed > partition in the first place, or repartitioning with less skew again before > the join. > Today the user cannot change partitioning without changing the join > condition in a hacky way: joinkey1 >= joinkey2 && joinkey1 <= joinkey2 > > The condition I mentioned below ensures that the *same* keys on left and > right formed their respective subsets: > left and right partition expressions have the same subset (with > regards to indices) of their respective join keys > > I don't believe EnsureRequirements will require any changes, just what the > Exec's are saying is required. > > *2. output partitionings* > Yea I got as far as you mentioned, but I didn't at first get why for outer > joins only one side is used. > Now however, I think it makes sense because for outer joins you may be > introducing nulls for at least one side, which makes that sides > partitioning invalid right? > > Warn New Year Regards, > Brett > > On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew <loand...@amazon.com> wrote: > >> “Thoughts on this approach?“ >> >> >> >> Just to warn you this is a hazardous optimization without cardinality >> information. Removing columns from the hash exchange reduces entropy >> potentially resulting in skew. Also keep in mind that if you reduce the >> number of columns on one side of the join you need todo it on the other. >> This will require you to rewrite EnsureRequirements or add a special case >> to detect this. >> >> >> >> As a word of warning there’s a whole bunch of subtle things that >> EnsureRequirements is doing and its really easy to unintentionally create >> performance regressions while making improvements in other areas. >> >> >> >> “Could someone help explain why the different join types have different >> output partitionings“ >> >> >> >> Long story short when a join happens the join exec zips together the >> partitions of the left and right side so that one partition of the join has >> the elements of the left and right. In the case of an inner join this >> means that that the resulting RDD is now partitioned by both the left join >> keys and the right join keys. I’d suggest taking a look at the join execs >> and take a look at how they build the result RDD from the partitions of the >> left and right RDDs.(see doExecute(…)) left/right outer does look >> surprising though. >> >> >> >> You should see something like… >> >> >> >> left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => >> >> >> >> >> >> Cheers Andrew >> >> >> >> *From: *Brett Marcott <brett.marc...@gmail.com> >> *Date: *Tuesday, December 31, 2019 at 11:49 PM >> *To: *"dev@spark.apache.org" <dev@spark.apache.org> >> *Subject: *SortMergeJoinExec: Utilizing child partitioning when joining >> >> >> >> Hi all, >> >> >> >> I found this jira for an issue I ran into recently: >> >> https://issues.apache.org/jira/browse/SPARK-28771 >> >> >> >> My initial idea for a fix is to change SortMergeJoinExec's (and >> ShuffledHashJoinExec) requiredChildDistribution. >> >> >> >> At least if all below conditions are met, we could only require a subset >> of keys for partitioning: >> >> left and right children's output partitionings are hashpartitioning with >> same numpartitions >> >> left and right partition expressions have the same subset (with regards >> to indices) of their respective join keys >> >> >> >> If that subset of keys is returned by requiredChildDistribution, >> then EnsureRequirements.ensureDistributionAndOrdering would not add a >> shuffle stage, hence reusing the children's partitioning. >> >> >> >> 1.Thoughts on this approach? >> >> >> >> 2. Could someone help explain why the different join types have different >> output partitionings in SortMergeJoinExec.outputPartitioning >> <https://github.com/apache/spark/blob/cdcd43cbf2479b258f4c5cfa0f6306f475d25cf2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L85-L96> >> ? >> >> >> >> Thanks, >> >> Brett >> >> >> >> >> >