Hi all, I also like this idea very much and I think it may bring also other performance improvements in the future.
Thanks to everybody who worked on this. I agree to target this feature for 3.0. Thanks everybody, Bests. Marco On Tue, 31 Jul 2018, 08:39 Wenchen Fan, <cloud0...@gmail.com> wrote: > Hi Carson and Yuanjian, > > Thanks for contributing to this project and sharing the production use > cases! I believe the adaptive execution will be a very important feature of > Spark SQL and will definitely benefit a lot of users. > > I went through the design docs and the high-level design totally makes > sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may > not have enough time to review the code and merge it, how about we target > this feature to Spark 3.0? > > Besides, it would be great if we can have some real benchmark numbers for > it. > > Thanks, > Wenchen > > On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xyliyuanj...@gmail.com> > wrote: > >> Thanks Carson, great note! >> Actually Baidu has ported this patch in our internal folk. I collected >> some user cases and performance improve effect during Baidu internal usage >> of this patch, summarize as following 3 scenario: >> 1. SortMergeJoin to BroadcastJoin >> The SortMergeJoin transform to BroadcastJoin over deeply tree node can >> bring us 50% to 200% boosting on query performance, and this strategy alway >> hit the BI scenario like join several tables with filter strategy in >> subquery >> 2. Long running application or use Spark as a service >> In this case, long running application refers to the duration of >> application near 1 hour. Using Spark as a service refers to use spark-shell >> and keep submit sql or use the service of Spark like Zeppelin, Livy or our >> internal sql service Baidu BigSQL. In such scenario, all spark jobs share >> same partition number, so enable AE and add configs about expected task >> info including data size, row number, min\max partition number and etc, >> will bring us 50%-100% boosting on performance improvement. >> 3. GraphFrame jobs >> The last scenario is the application use GraphFrame, in this case, user >> has a 2-dimension graph with 1 billion edges, use the connected >> componentsalgorithm in GraphFrame. With enabling AE, the duration of app >> reduce from 58min to 32min, almost 100% boosting on performance improvement. >> >> The detailed screenshot and config in the JIRA SPARK-23128 >> <https://issues.apache.org/jira/browse/SPARK-23128> attached pdf. >> >> Thanks, >> Yuanjian Li >> >> Wang, Carson <carson.w...@intel.com> 于2018年7月28日周六 上午12:49写道: >> >>> Dear all, >>> >>> >>> >>> The initial support of adaptive execution[SPARK-9850 >>> <https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has >>> been there since Spark 1.6, but there is no more update since then. One of >>> the key features in adaptive execution is to determine the number of >>> reducer automatically at runtime. This is a feature required by many Spark >>> users especially the infrastructure team in many companies, as there are >>> thousands of queries running on the cluster where the shuffle partition >>> number may not be set properly for every query. The same shuffle partition >>> number also doesn’t work well for all stages in a query because each stage >>> has different input data size. Other features in adaptive execution include >>> optimizing join strategy at runtime and handling skewed join automatically, >>> which have not been implemented in Spark. >>> >>> >>> >>> In the current implementation, an Exchange coordinator is used to >>> determine the number of post-shuffle partitions for a stage. However, >>> exchange coordinator is added when Exchange is being added, so it actually >>> lacks a global picture of all shuffle dependencies of a post-shuffle >>> stage. I.e. for 3 tables’ join in a single stage, the same >>> ExchangeCoordinator should be used in three Exchanges but currently two >>> separated ExchangeCoordinator will be added. It also adds additional >>> Exchanges in some cases. So I think it is time to rethink how to better >>> support adaptive execution in Spark SQL. I have proposed a new approach in >>> SPARK-23128 <https://issues.apache.org/jira/browse/SPARK-23128>. A >>> document about the idea is described at here >>> <https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>. >>> The idea about how to changing a sort merge join to a broadcast hash join >>> at runtime is also described in a separated doc >>> <https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>. >>> >>> >>> >>> >>> The docs have been there for a while, and I also had an implementation >>> based on Spark 2.3 available at >>> https://github.com/Intel-bigdata/spark-adaptive. The code is split into >>> 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many >>> partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and >>> received very good feedback. Baidu also shared their result at the Jira. We >>> also finished a 100 TB TPC-DS benchmark earlier using the patch which >>> passed all queries with good performance improvement. >>> >>> >>> >>> I’d like to call for a review on the docs and even code and we can >>> further discuss in this thread. Thanks very much! >>> >>> >>> >>> Thanks, >>> >>> Carson >>> >>> >>> >>