Hi Carson and Yuanjian,

Thanks for contributing to this project and sharing the production use
cases! I believe the adaptive execution will be a very important feature of
Spark SQL and will definitely benefit a lot of users.

I went through the design docs and the high-level design totally makes
sense to me. Since the code freeze of Spark 2.4 is close, I'm afraid we may
not have enough time to review the code and merge it, how about we target
this feature to Spark 3.0?

Besides, it would be great if we can have some real benchmark numbers for
it.

Thanks,
Wenchen

On Tue, Jul 31, 2018 at 2:26 PM Yuanjian Li <xyliyuanj...@gmail.com> wrote:

> Thanks Carson, great note!
> Actually Baidu has ported this patch in our internal folk. I collected
> some user cases and performance improve effect during Baidu internal usage
> of this patch, summarize as following 3 scenario:
> 1. SortMergeJoin to BroadcastJoin
> The SortMergeJoin transform to BroadcastJoin over deeply tree node can
> bring us 50% to 200% boosting on query performance, and this strategy alway
> hit the BI scenario like join several tables with filter strategy in
> subquery
> 2. Long running application or use Spark as a service
> In this case, long running application refers to the duration of
> application near 1 hour. Using Spark as a service refers to use spark-shell
> and keep submit sql or use the service of Spark like Zeppelin, Livy or our
> internal sql service Baidu BigSQL. In such scenario, all spark jobs share
> same partition number, so enable AE and add configs about expected task
> info including data size, row number, min\max partition number and etc,
> will bring us 50%-100% boosting on performance improvement.
> 3. GraphFrame jobs
> The last scenario is the application use GraphFrame, in this case, user
> has a 2-dimension graph with 1 billion edges, use the connected
> componentsalgorithm in GraphFrame. With enabling AE, the duration of app
> reduce from 58min to 32min, almost 100% boosting on performance improvement.
>
> The detailed screenshot and config in the JIRA SPARK-23128
> <https://issues.apache.org/jira/browse/SPARK-23128> attached pdf.
>
> Thanks,
> Yuanjian Li
>
> Wang, Carson <carson.w...@intel.com> 于2018年7月28日周六 上午12:49写道:
>
>> Dear all,
>>
>>
>>
>> The initial support of adaptive execution[SPARK-9850
>> <https://issues.apache.org/jira/browse/SPARK-9850>] in Spark SQL has
>> been there since Spark 1.6, but there is no more update since then. One of
>> the key features in adaptive execution is to determine the number of
>> reducer automatically at runtime. This is a feature required by many Spark
>> users especially the infrastructure team in many companies, as there are
>> thousands of queries running on the cluster where the shuffle partition
>> number may not be set properly for every query. The same shuffle partition
>> number also doesn’t work well for all stages in a query because each stage
>> has different input data size. Other features in adaptive execution include
>> optimizing join strategy at runtime and handling skewed join automatically,
>> which have not been implemented in Spark.
>>
>>
>>
>> In the current implementation, an Exchange coordinator is used to
>> determine the number of post-shuffle partitions for a stage. However,
>> exchange coordinator is added when Exchange is being added, so it actually
>> lacks a global picture of all shuffle dependencies of a post-shuffle
>> stage.  I.e. for 3 tables’ join in a single stage, the same
>> ExchangeCoordinator should be used in three Exchanges but currently two
>> separated ExchangeCoordinator will be added. It also adds additional
>> Exchanges in some cases. So I think it is time to rethink how to better
>> support adaptive execution in Spark SQL. I have proposed a new approach in
>> SPARK-23128 <https://issues.apache.org/jira/browse/SPARK-23128>. A
>> document about the idea is described at here
>> <https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing>.
>> The idea about how to changing a sort merge join to a broadcast hash join
>> at runtime is also described in a separated doc
>> <https://docs.google.com/document/d/1WCJ2BmA8_dJL_jmYie_x9ZCrz7r3ZjleJSoX0dlDXaw/edit?usp=sharing>.
>>
>>
>>
>>
>> The docs have been there for a while, and I also had an implementation
>> based on Spark 2.3 available at
>> https://github.com/Intel-bigdata/spark-adaptive. The code is split into
>> 7 PRs labeled with AE2.3-0x if you look at the pull requests. I asked many
>> partners to evaluate the patch including Baidu, Alibaba, JD.com, etc and
>> received very good feedback. Baidu also shared their result at the Jira. We
>> also finished a 100 TB TPC-DS benchmark earlier using the patch which
>> passed all queries with good performance improvement.
>>
>>
>>
>> I’d like to call for a review on the docs and even code and we can
>> further discuss in this thread. Thanks very much!
>>
>>
>>
>> Thanks,
>>
>> Carson
>>
>>
>>
>

Reply via email to