Hi Till,

Thanks for the comments!

I agree with you that we should avoid an auto-scaled job not able to be
scheduled
in standalone/reactive mode. And I think it's great if we can expose a
deployment
option that is consistent for streaming and batch jobs, which can be easier
to
understand. Just looking to the day to make both adaptive schedulers
default, so
that most users do not need to care about job tuning while the job can run
well.

Regarding the three options, personally I prefer to take *#1* as the first
step, to
limit the scope of this FLIP a bit, otherwise it may be too complicated.
I think *#3* is the final goal we need to target later, so that mixed
bounded and
unbounded workloads can be supported. Given that there can be multiple
stages scheduled at the same time, the design of the scheduling may not be
very straightforward and needs some thorough consideration.
*#2* can be a very good improvement itself. Shuffles of batch jobs can be
auto-determined to be pipelined or blocking according to available
resources.
But the changes may involve many components and can be large. So I think
it can be a standalone future improvement.

Regarding the side note to abstract subpartitions as splits, the idea is
very
interesting to me. Besides supporting auto scaling, I think trackable
produced
splits can also help in troubleshooting and give some insights for future
improvements. Collecting data sizes for batch adaptive scheduler can be the
first step and we can further consider the abstraction of it.

Thanks,
Zhu

Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道:

> Hi Lijie,
>
> Thanks for drafting this FLIP together with Zhu Zhu :-)
>
> I like the idea of making the parallelism of operators of a bounded job
> dependent on the data size. This makes the job adjust automatically when
> the data sources/sizes change.
>
> I can see this work well in combination with the active mode where Flink
> can ask for more resources.
>
> In the case of the standalone mode, I think it can lead to situations where
> one and the same job can be scheduled or not depending on the input data.
> The problem is pipelined regions that contain more than a single operator
> instance (e.g. pipelined shuffles). We already have this problem when
> submitting a batch job with too high parallelism onto a standalone cluster.
> However, with the adaptive batch mode this problem might become a bit more
> present. So my question would be how can we solve this problem (potentially
> in a follow up step). I could think of the following three alternatives
> atm:
>
> 1. Only allow blocking data exchanges: This will limit the size of a
> pipelined region to a single operator instance. This has the downside that
> we no longer support pipelined execution of multiple operators (other than
> chained). Moreover, it requires the user to set all data exchanges to
> blocking which cannot be enforced atm.
> 2. Introduce a new pipelined-blocking data exchange hybrid that supports
> pipelined data exchanges but can also spill to disk if there is no
> consumer: This could allow to still make progress in case that one has a
> pipelined region which requires more slots than what we currently have.
> 3. Decide on the actual parallelism of a pipelined region after having
> received the slots that are declared based on the data size per subtask. If
> the pipelined region contains an all-to-all connection, then the
> parallelism is how many slots we currently have. If not, then the
> parallelism can be decided by the data volume: This would effectively mean
> to enable the existing AdaptiveScheduler to also run batch workloads.
>
> With either of these options, I believe that we could provide a somewhat
> consistent behaviour across the different deployment and execution modes
> wrt to scaling:
>
> a) Active + streaming job that uses AdaptiveScheduler: Can run with fewer
> slots than requested. Can ask for more slots. Once new slots arrive it will
> make use of it.
> b) Reactive + streaming job that uses AdaptiveScheduler: Can run with fewer
> slots than requested. Once new slots arrive it will make use of it.
> c) Active + batch job that uses batch adaptive scheduler + any of 1., 2. or
> 3.: Can run with fewer slots than requested (because it can complete the
> job with a single slot). Can ask for more slots. Once new slots arrive it
> will make use of it.
> b) Standalone + batch job that uses batch adaptive scheduler + any of 1.,
> 2. or 3.: Can run with fewer slots than requested (because it can complete
> the job with a single slot). Once new slots arrive it will make use of it
> (up to the desired maximum parallelism).
>
> If we decide to go with option 1. or 2., then we will only be able to run
> mixed workloads (mixture of bounded and unbounded sources) in streaming
> mode. This might be ok for the time being.
>
> This actually leads to my main concern, which is to give our users
> consistent and somewhat easy to understand deployment options. In order to
> achieve this Flink should always be able to make progress unless the
> parallelism is explicitly configured (e.g. a very high parallelism in a
> pipelined region that cannot be fulfilled). Moreover, Flink should be able
> to make use of new resources if the job isn't being run at the maximum
> parallelism already. Removing slots so that the minimum number of required
> slots is still available should also be possible. Maybe one idea could be
> to make the adaptive batch scheduler the default for batch jobs eventually.
> For streaming jobs, we would ideally always use the AdaptiveScheduler
> to give a consistent behaviour.
>
> As a side note: Creating as many subpartitions as the maximum parallelism
> is will result in a one-to-one mapping between sub partitions and key
> groups. If we then also make the non keyed operators work on a set of sub
> partitions that store the operator state, then the sub partitions could be
> seen as some logical work unit/split that is assigned to operators. Having
> such an abstraction could allow us to track which work unit has completed
> which helps with rescaling of operators and maintaining order guarantees,
> for example.
>
> I also left some smaller comments in the wiki.
>
> Cheers,
> Till
>
> On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <wangdachui9...@gmail.com>
> wrote:
>
> > Hi all,
> >
> >
> > Zhu Zhu and I propose to introduce a new job scheduler to Flink: adaptive
> > batch job scheduler. The new scheduler can automatically decide
> > parallelisms of job vertices for batch jobs, according to the size of
> data
> > volume each vertex needs to process.
> >
> > Major benefits of this scheduler includes:
> >
> >    1. Batch job users can be relieved from parallelism tuning
> >    2. Automatically tuned parallelisms can be vertex level and can better
> >    fit consumed datasets which have a varying volume size every day
> >
> >
> >    1. Vertices from SQL batch jobs can be assigned with different
> >    parallelisms which are automatically tuned
> >    2. It can be the first step towards enabling auto-rebalancing
> workloads
> >    of tasks
> >
> > You can find more details in the FLIP-187[1]. Looking forward to your
> > feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> >
> > Best,
> >
> > Lijie
> >
>

Reply via email to