Dear all, The initial support of adaptive execution[SPARK-9850<https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has been there since Spark 1.6, but there is no more update since then. One of the key features in adaptive execution is to determine the number of reducer automatically at runtime. This is a feature required by many Spark users especially the infrastructure team in many companies, as there are thousands of queries running on the cluster where the shuffle partition number may not be set properly for every query. The same shuffle partition number also doesn't work well for all stages in a query because each stage has different input data size. Other features in adaptive execution include optimizing join strategy at runtime and handling skewed join automatically, which have not been implemented in Spark.
In the current implementation, an Exchange coordinator is used to determine the number of post-shuffle partitions for a stage. However, exchange coordinator is added when Exchange is being added, so it actually lacks a global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3 tables' join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. It also adds additional Exchanges in some cases. So I think it is time to rethink how to better support adaptive execution in Spark SQL. I have proposed a new approach in SPARK-23128<https://issues.apache.org/jira/browse/SPARK-23128>. A document about the idea is described at here<https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>. The idea about how to changing a sort merge join to a broadcast hash join at runtime is also described in a separated doc<https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>. The docs have been there for a while, and I also had an implementation based on Spark 2.3 available at https://github.com/Intel-bigdata/spark-adaptive. The code is split into 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and received very good feedback. Baidu also shared their result at the Jira. We also finished a 100 TB TPC-DS benchmark earlier using the patch which passed all queries with good performance improvement. I'd like to call for a review on the docs and even code and we can further discuss in this thread. Thanks very much! Thanks, Carson