>> # Horizontal scaling V.S. Vertical scaling
>
>True. We left out vertical scaling intentionally. For now we assume CPU /
memory is set up by the user. While definitely useful, vertical scaling
>adds another dimension to the scaling problem which we wanted to tackle
later. I'll update the FLIP to explicitly state that.

Actually, the proposed algorithm is a kind of hybrid between horizontal and
vertical scaling. The vertical scaling aspect is due to the parallelism
adjustments of each task. Pure horizontal scaling would uniformly adjust
the parallelism of all tasks. The motivation there is to optimize the data
flow and prevent backpressure. At the same time, we don't touch the
underlying container resources or the number of task slots per TaskManager
as one typically would in vertical scaling.

On Tue, Nov 8, 2022 at 11:47 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> I had 2 extra comments to Max's reply:
>
> 1. About pre-allocating resources:
> This could be done through the operator when the standalone deployment mode
> is used relatively easily as there we have better control of
> pods/resources.
>
> 2. Session jobs:
> There is a FLIP (
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> )
> to support passing configuration when we submit jobs to the session cluster
> through the rest api. Once that goes through, session jobs can also be
> scaled in a similar way through the configuration.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels <m...@apache.org> wrote:
>
> > @Yang
> >
> > >Since the current auto-scaling needs to fully redeploy the application,
> it
> > may fail to start due to lack of resources.
> >
> > Great suggestions. I agree that we will have to have to preallocate /
> > reserve resources to ensure the rescaling doesn't take longer as
> expected.
> > This is not only a problem when scaling up but also when scaling down
> > because any pods surrendered might be taken over by another deployment
> > during the rescaling. This would certainly be a case for integrating
> > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > rescaling API. Alternatively, the operator would have to reserve the
> > resources somehow.
> >
> > >Does auto-scaling have a plan to support jobs which are running in a
> > session cluster? It might be a different
> >
> > We are targeting the application deployment mode for the first version
> but
> > the standalone mode can be supported as soon as we have an integration
> with
> > the scheduler.
> >
> > > # Horizontal scaling V.S. Vertical scaling
> >
> > True. We left out vertical scaling intentionally. For now we assume CPU /
> > memory is set up by the user. While definitely useful, vertical scaling
> > adds another dimension to the scaling problem which we wanted to tackle
> > later. I'll update the FLIP to explicitly state that.
> >
> > -Max
> >
> >
> >
> > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang <danrtsey...@gmail.com> wrote:
> >
> > > Thanks for the fruitful discussion and I am really excited to see that
> > the
> > > auto-scaling really happens for
> > >
> > > Flink Kubernetes operator. It will be a very important step to make the
> > > long-running Flink job more smoothly.
> > >
> > > I just have some immature ideas and want to share them here.
> > >
> > >
> > > # Resource Reservation
> > >
> > > Since the current auto-scaling needs to fully redeploy the application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > I know the Kubernetes operator could rollback to the old spec, but we
> > still
> > > waste a lot of time to make things worse.
> > >
> > > I hope the FLIP-250[1](support customized K8s scheduler) could help in
> > this
> > > case.
> > >
> > >
> > > # Session cluster
> > >
> > > Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > story since we could not use Flink config options to override the  job
> > > vertex parallelisms. Given that the SessionJob
> > >
> > > is also a first-class citizen, we need to document the limitation if
> not
> > > support.
> > >
> > >
> > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > IIUC, the current proposal does not mention vertical scaling. There
> might
> > > be a chance that the memory/cpu of
> > >
> > > TaskManager is not configured properly. And this will cause unnecessary
> > > multiple scaling executions.
> > >
> > >
> > >
> > >
> > > [1].
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > >
> > >
> > >
> > > Best,
> > >
> > > Yang
> > >
> > > Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道:
> > >
> > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > already did a great job addressing the questions here. Let me try to
> > > > add additional context:
> > > >
> > > > @Biao Geng:
> > > >
> > > > >1.  For source parallelisms, if the user configure a much larger
> value
> > > > than normal, there should be very little pending records though it is
> > > > possible to get optimized. But IIUC, in current algorithm, we will
> not
> > > take
> > > > actions for this case as the backlog growth rate is almost zero. Is
> the
> > > > understanding right?
> > > >
> > > > This is actually a corner case which we haven't exactly described in
> > > > the FLIP yet. Sources are assumed to only be scaled according to the
> > > > backlog but if there is zero backlog, we don't have a number to
> > > > compute the parallelism. In this case we tune the source based on the
> > > > utilization, just like we do for the other vertices. That could mean
> > > > reducing the parallelism in case the source is not doing any work.
> > > > Now, in case there is no backlog, we need to be careful that we don't
> > > > bounce back to a higher parallelism afterwards.
> > > >
> > > > >2.  Compared with “scaling out”, “scaling in” is usually more
> > dangerous
> > > > as it is more likely to lead to negative influence to the downstream
> > > jobs.
> > > > The min/max load bounds should be useful. I am wondering if it is
> > > possible
> > > > to have different strategy for “scaling in” to make it more
> > conservative.
> > > > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > > > strategy).
> > > >
> > > > Gyula already mentioned the bounded scale-down. Additionally, we
> could
> > > > add more conservative utilization targets for scale down. For
> example,
> > > > if we targeted 60% utilization for scale-up, we might target 30%
> > > > utilization for scale-down, essentially reducing the parallelism
> > > > slower. Same as with the limited parallelism scale-down, in the worst
> > > > case this will require multiple scale downs. Ideally, the metrics
> > > > should be reliable enough such that we do not require such
> > > > workarounds.
> > > >
> > > > @JunRui Lee:
> > > >
> > > > >In the document, I didn't find the definition of when to trigger
> > > > autoScaling after some jobVertex reach the threshold. If I missed is,
> > > > please let me know.
> > > >
> > > > The triggering is supposed to work based on the number of metric
> > > > reports to aggregate and the cool down time. Additionally, there are
> > > > boundaries for the target rates such that we don't scale on tiny
> > > > deviations of the rates. I agree that we want to prevent unnecessary
> > > > scalings as much as possible. We'll expand on that.
> > > >
> > > > @Pedro Silva:
> > > >
> > > > >Have you considered making metrics collection getting triggered
> based
> > on
> > > > events rather than periodic checks?
> > > >
> > > > Ideally we want to continuously monitor the job to be able to find
> > > > bottlenecks. Based on the metrics, we will decide whether to scale or
> > > > not. However, if we find that the continuous monitoring is too
> costly,
> > > > we might do it based on signals. Also, if there is some key-turn
> event
> > > > that we must refresh our metrics for, that could also be interesting.
> > > > A sudden spike in the backlog could warrant that.
> > > >
> > > > > Could the FLIP also be used to auto-scale based on state-level
> > metrics
> > > > at an operator level?
> > > >
> > > > It could but we don't want to modify the JobGraph which means we are
> > > > bound to using task-level parallelism. Setting operator level
> > > > parallelism would mean rebuilding the JobGraph which is a tricky
> thing
> > > > to do. It would increase the solution space but also the complexity
> of
> > > > finding a stable scaling configuration.
> > > >
> > > > @Zheng:
> > > >
> > > > >After the user opens (advcie), it does not actually perform
> > AutoScaling.
> > > > It only outputs the notification form of tuning suggestions for the
> > > user's
> > > > reference.
> > > >
> > > > That's a great idea. Such a "dry run" feature would give users a
> > > > better sense of how the autoscaler would work.
> > > >
> > > > >I found that FFA 2020 Netflix has a related topic discussing the
> > > > automatic tuning function
> > > >
> > > > Thanks! They actually use a similar idea with respect to
> backlog-based
> > > > scaling. Where their approach differs is that they scale the entire
> > > > job based on a target load instead of scaling vertices individually.
> > > > They have some interesting ideas like the lookup table for past
> > > > scalings and the load prediction based on regressions. I have
> > > > intentionally left out those optimizations but I think they can be
> > > > useful if implemented well.
> > > >
> > > > >Can we provide some interfaces for users to customize and implement
> > some
> > > > tuning algorithms?
> > > >
> > > > Certainly, I think it is critical that we provide a default
> > > > implementation that works well for all kinds of Flink jobs. But users
> > > > should have the option to plug in their own implementation. This will
> > > > especially be useful for new sources which might not have the backlog
> > > > information available as some of the built-in ones like Kafka.
> > > >
> > > > @Dong:
> > > >
> > > > >For example, suppose a Kafka Sink subtask has reached I/O bottleneck
> > > > when flushing data out to the Kafka clusters, will
> busyTimeMsPerSecond
> > > > reach 1 sec?
> > > >
> > > > The busyTimeMsPerSecond metric captures all the work of the task
> > > > excluding backpressure time. So a sink's IO work would be included,
> > > > unless it is non-blocking. Typically sinks write to external systems
> > > > which can itself backpressure, and should to avoid the sink to be
> > > > overwhelmed with data. This can be problematic in case the external
> > > > system is the bottleneck and a parallelism increase does not increase
> > > > the write rate. However, Gyula has previously thought about this
> exact
> > > > case and proposed to stop scaling those tasks by measuring the
> > > > effectiveness of the scaling decision and "giving up" for that vertex
> > > > in case we don't yield the expected result.
> > > >
> > > > >The configuration section does not seem to provide this config.
> Could
> > > you
> > > > specify this?
> > > >
> > > > Thanks! I've added it.
> > > >
> > > > >How are users expected to specify the per-operator configs?
> > > >
> > > > That would only be possible when they create their job like you would
> > > > normally write a Flink job. As soon as the autoscaler takes over,
> they
> > > > won't be able to change parallelisms. However, it would be
> conceivable
> > > > that we allow manually fixing some of the operator parallelisms but
> > > > that should be a last resort and not required.
> > > >
> > > > >How often will the Flink Kubernetes operator query metrics from
> > > > JobManager? Is this configurable?
> > > >
> > > > This will be configurable. I think the JobManager only aggregates
> > > > metrics every 5 seconds. I think something like every 30 seconds
> makes
> > > > sense.
> > > >
> > > > >5) Could you specify the config name and default value for the
> > proposed
> > > > configs? 6) Could you add the name/mbean/type for the proposed
> metrics?
> > > >
> > > > We will add this soon but feel free to suggest in the wiki page if
> you
> > > > have ideas.
> > > >
> > > > @Yanfei:
> > > >
> > > > >It's said that "The first step is collecting metrics for all
> > JobVertices
> > > > by combining metrics from all the runtime subtasks and computing the
> > > > *average*". When the load of the subtasks of an operator is not
> > balanced,
> > > > do we need to trigger autoScaling? Has the median or some percentiles
> > > been
> > > > considered?
> > > >
> > > > It hasn't but the suggestion is very welcome! There are pros and cons
> > > > for every aggregation method. The median is certainly something to
> > > > look at because, in contrast to the average, it would prevent hot
> > > > partitions from triggering a large scale up.
> > > >
> > > > > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this
> proposal,
> > > > will we reuse some logic from Reactive Mode?
> > > >
> > > > I wanted to mention it but I left it out intentionally to avoid
> > > > confusion. I think the approach in the reactive mode is very
> different
> > > > from what we are proposing here. Reactive mode is really not about
> > > > autoscaling but about reacting to node changes in the cluster. We
> > > > specifically want to solve the business logic of autoscaling instead
> > > > of focusing on the execution part. For the first version, we plan to
> > > > do a full redeploy which job vertex parallelism overrides. However,
> we
> > > > have previously discussed with Chesnay that we could use Flink's old
> > > > /rescaling API which would allow a more graceful scaling of the
> > > > existing cluster. That would certainly be desirable.
> > > >
> > > >
> > > > -Max
> > > >
> > > > On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com>
> > wrote:
> > > > >
> > > > > @Dong:
> > > > >
> > > > > Looking at the busyTime metrics in the TaskOMetricGroup it seems
> that
> > > > busy
> > > > > time is actually defined as "not idle or (soft) backpressured" .
> So I
> > > > think
> > > > > it would give us the correct reading based on what you said about
> the
> > > > Kafka
> > > > > sink.
> > > > >
> > > > > In any case we have to test this and if something is not as we
> expect
> > > we
> > > > > should fix the metric.
> > > > >
> > > > > Gyula
> > > > >
> > > > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > > >
> > > > > > Thanks for the explanation Gyula. Please see my reply inline.
> > > > > >
> > > > > > BTW, has the proposed solution been deployed and evaluated with
> any
> > > > > > production workload? If yes, I am wondering if you could share
> the
> > > > > > experience, e.g. what is the likelihood of having regression and
> > > > > > improvement respectively after enabling this feature.
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> Hi Dong!
> > > > > >>
> > > > > >> Let me try to answer the questions :)
> > > > > >>
> > > > > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the
> > > time
> > > > > >> spent in the main record processing loop for an operator if I
> > > > > >> understand correctly. This includes IO operations too.
> > > > > >>
> > > > > >
> > > > > > I took a look at the StreamTask::processInput(...)
> > > > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576
> > > > >.
> > > > > > My understanding of this code is that when KafkaSink (same for
> > other
> > > > sinks)
> > > > > > can not write data out to the network fast enough, recordWriter
> > > > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575
> > > > >
> > > > > > becomes unavailable and the StreamTask is considered to be
> > > > > > soft-back-pressured, instead of being considered busy.
> > > > > >
> > > > > > If this is the case, then the algorithm currently proposed in the
> > > FLIP
> > > > > > might under-provision the sink operator's parallelism when the
> sink
> > > > > > operator is the bottleneck. I am not an expert with this piece of
> > > code
> > > > > > though. I am wondering if you or someone else could double-check
> > > this.
> > > > > >
> > > > > >
> > > > > >> 2: We should add this to the FLIP I agree. It would be a
> Duration
> > > > config
> > > > > >> with the expected catch up time after rescaling (let's say 5
> > > > minutes). It
> > > > > >> could be computed based on the current data rate and the
> > calculated
> > > > max
> > > > > >> processing rate after the rescale.
> > > > > >>
> > > > > >> Great. I am looking forward to the formula :)
> > > > > >
> > > > > >
> > > > > >> 3: In the current proposal we don't have per operator configs.
> > > Target
> > > > > >> utilization would apply to all operators uniformly.
> > > > > >>
> > > > > >> 4: It should be configurable, yes.
> > > > > >>
> > > > > >> 5,6: The names haven't been finalized but I think these are
> minor
> > > > > >> details. We could add concrete names to the FLIP :)
> > > > > >>
> > > > > >> Sounds good.
> > > > > >
> > > > > >
> > > > > >> Cheers,
> > > > > >> Gyula
> > > > > >>
> > > > > >>
> > > > > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >>> Hi Max,
> > > > > >>>
> > > > > >>> Thank you for the proposal. The proposal tackles a very
> important
> > > > issue
> > > > > >>> for Flink users and the design looks promising overall!
> > > > > >>>
> > > > > >>> I have some questions to better understand the proposed public
> > > > > >>> interfaces and the algorithm.
> > > > > >>>
> > > > > >>> 1) The proposal seems to assume that the operator's
> > > > busyTimeMsPerSecond
> > > > > >>> could reach 1 sec. I believe this is mostly true for cpu-bound
> > > > operators.
> > > > > >>> Could you confirm that this can also be true for io-bound
> > operators
> > > > such as
> > > > > >>> sinks? For example, suppose a Kafka Sink subtask has reached
> I/O
> > > > bottleneck
> > > > > >>> when flushing data out to the Kafka clusters, will
> > > > busyTimeMsPerSecond
> > > > > >>> reach 1 sec?
> > > > > >>>
> > > > > >>> 2) It is said that "users can configure a maximum time to fully
> > > > process
> > > > > >>> the backlog". The configuration section does not seem to
> provide
> > > this
> > > > > >>> config. Could you specify this? And any chance this proposal
> can
> > > > provide
> > > > > >>> the formula for calculating the new processing rate?
> > > > > >>>
> > > > > >>> 3) How are users expected to specify the per-operator configs
> > (e.g.
> > > > > >>> target utilization)? For example, should users specify it
> > > > programmatically
> > > > > >>> in a DataStream/Table/SQL API?
> > > > > >>>
> > > > > >>> 4) How often will the Flink Kubernetes operator query metrics
> > from
> > > > > >>> JobManager? Is this configurable?
> > > > > >>>
> > > > > >>> 5) Could you specify the config name and default value for the
> > > > proposed
> > > > > >>> configs?
> > > > > >>>
> > > > > >>> 6) Could you add the name/mbean/type for the proposed metrics?
> > > > > >>>
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Dong
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > >
> > >
> >
>

Reply via email to