Thanks Marco and Wenchen for reviewing. It sounds good to target this for 3.0.
I can also share more data on the benchmark. In the 100 TB TPC-DS benchmark we performed on a 100-node cluster, we saw 90% of the 103 queries had performance gain, and 46% of them are more than 1.1x faster. Individual query can have up to 3.8x performance gain(q8: 3.8x, q81:2.1x, q30: 2.1x, q51: 1.8x, q61: 1.6x, q60: 1.6x …). In addition, 5 queries failed earlier can pass in adaptive execution mode successfully. The detailed report is also available here<https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb>. Thanks, Carson From: Marco Gaido [mailto:marcogaid...@gmail.com] Sent: Tuesday, July 31, 2018 3:00 PM To: Wenchen Fan <cloud0...@gmail.com> Cc: xyliyuanj...@gmail.com; Wang, Carson <carson.w...@intel.com>; Spark dev list <dev@spark.apache.org> Subject: Re: [DISCUSS] Adaptive execution in Spark SQL Hi all, I also like this idea very much and I think it may bring also other performance improvements in the future. Thanks to everybody who worked on this. I agree to target this feature for 3.0. Thanks everybody, Bests. Marco On Tue, 31 Jul 2018, 08:39 Wenchen Fan, <cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote: Hi Carson and Yuanjian, Thanks for contributing to this project and sharing the production use cases! I believe the adaptive execution will be a very important feature of Spark SQL and will definitely benefit a lot of users. I went through the design docs and the high-level design totally makes sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may not have enough time to review the code and merge it, how about we target this feature to Spark 3.0? Besides, it would be great if we can have some real benchmark numbers for it. Thanks, Wenchen On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xyliyuanj...@gmail.com<mailto:xyliyuanj...@gmail.com>> wrote: Thanks Carson, great note! Actually Baidu has ported this patch in our internal folk. I collected some user cases and performance improve effect during Baidu internal usage of this patch, summarize as following 3 scenario: 1. SortMergeJoin to BroadcastJoin The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 50% to 200% boosting on query performance, and this strategy alway hit the BI scenario like join several tables with filter strategy in subquery 2. Long running application or use Spark as a service In this case, long running application refers to the duration of application near 1 hour. Using Spark as a service refers to use spark-shell and keep submit sql or use the service of Spark like Zeppelin, Livy or our internal sql service Baidu BigSQL. In such scenario, all spark jobs share same partition number, so enable AE and add configs about expected task info including data size, row number, min\max partition number and etc, will bring us 50%-100% boosting on performance improvement. 3. GraphFrame jobs The last scenario is the application use GraphFrame, in this case, user has a 2-dimension graph with 1 billion edges, use the connected componentsalgorithm in GraphFrame. With enabling AE, the duration of app reduce from 58min to 32min, almost 100% boosting on performance improvement. The detailed screenshot and config in the JIRA SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128> attached pdf. Thanks, Yuanjian Li Wang, Carson <carson.w...@intel.com<mailto:carson.w...@intel.com>> 于2018年7月28日周六 上午12:49写道: Dear all, The initial support of adaptive execution[SPARK-9850<https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has been there since Spark 1.6, but there is no more update since then. One of the key features in adaptive execution is to determine the number of reducer automatically at runtime. This is a feature required by many Spark users especially the infrastructure team in many companies, as there are thousands of queries running on the cluster where the shuffle partition number may not be set properly for every query. The same shuffle partition number also doesn’t work well for all stages in a query because each stage has different input data size. Other features in adaptive execution include optimizing join strategy at runtime and handling skewed join automatically, which have not been implemented in Spark. In the current implementation, an Exchange coordinator is used to determine the number of post-shuffle partitions for a stage. However, exchange coordinator is added when Exchange is being added, so it actually lacks a global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3 tables’ join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. It also adds additional Exchanges in some cases. So I think it is time to rethink how to better support adaptive execution in Spark SQL. I have proposed a new approach in SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128>. A document about the idea is described at here<https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>. The idea about how to changing a sort merge join to a broadcast hash join at runtime is also described in a separated doc<https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>. The docs have been there for a while, and I also had an implementation based on Spark 2.3 available at https://github.com/Intel-bigdata/spark-adaptive. The code is split into 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and received very good feedback. Baidu also shared their result at the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch which passed all queries with good performance improvement. I’d like to call for a review on the docs and even code and we can further discuss in this thread. Thanks very much! Thanks, Carson