Thanks for the fruitful discussion and I am really excited to see that the
auto-scaling really happens for

Flink Kubernetes operator. It will be a very important step to make the
long-running Flink job more smoothly.

I just have some immature ideas and want to share them here.


# Resource Reservation

Since the current auto-scaling needs to fully redeploy the application, it
may fail to start due to lack of resources.

I know the Kubernetes operator could rollback to the old spec, but we still
waste a lot of time to make things worse.

I hope the FLIP-250[1](support customized K8s scheduler) could help in this
case.


# Session cluster

Does auto-scaling have a plan to support jobs which are running in a
session cluster? It might be a different

story since we could not use Flink config options to override the  job
vertex parallelisms. Given that the SessionJob

is also a first-class citizen, we need to document the limitation if not
support.


# Horizontal scaling V.S. Vertical scaling

IIUC, the current proposal does not mention vertical scaling. There might
be a chance that the memory/cpu of

TaskManager is not configured properly. And this will cause unnecessary
multiple scaling executions.




[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal



Best,

Yang

Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道:

> Thanks for all the interest here and for the great remarks! Gyula
> already did a great job addressing the questions here. Let me try to
> add additional context:
>
> @Biao Geng:
>
> >1.  For source parallelisms, if the user configure a much larger value
> than normal, there should be very little pending records though it is
> possible to get optimized. But IIUC, in current algorithm, we will not take
> actions for this case as the backlog growth rate is almost zero. Is the
> understanding right?
>
> This is actually a corner case which we haven't exactly described in
> the FLIP yet. Sources are assumed to only be scaled according to the
> backlog but if there is zero backlog, we don't have a number to
> compute the parallelism. In this case we tune the source based on the
> utilization, just like we do for the other vertices. That could mean
> reducing the parallelism in case the source is not doing any work.
> Now, in case there is no backlog, we need to be careful that we don't
> bounce back to a higher parallelism afterwards.
>
> >2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> as it is more likely to lead to negative influence to the downstream jobs.
> The min/max load bounds should be useful. I am wondering if it is possible
> to have different strategy for “scaling in” to make it more conservative.
> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> strategy).
>
> Gyula already mentioned the bounded scale-down. Additionally, we could
> add more conservative utilization targets for scale down. For example,
> if we targeted 60% utilization for scale-up, we might target 30%
> utilization for scale-down, essentially reducing the parallelism
> slower. Same as with the limited parallelism scale-down, in the worst
> case this will require multiple scale downs. Ideally, the metrics
> should be reliable enough such that we do not require such
> workarounds.
>
> @JunRui Lee:
>
> >In the document, I didn't find the definition of when to trigger
> autoScaling after some jobVertex reach the threshold. If I missed is,
> please let me know.
>
> The triggering is supposed to work based on the number of metric
> reports to aggregate and the cool down time. Additionally, there are
> boundaries for the target rates such that we don't scale on tiny
> deviations of the rates. I agree that we want to prevent unnecessary
> scalings as much as possible. We'll expand on that.
>
> @Pedro Silva:
>
> >Have you considered making metrics collection getting triggered based on
> events rather than periodic checks?
>
> Ideally we want to continuously monitor the job to be able to find
> bottlenecks. Based on the metrics, we will decide whether to scale or
> not. However, if we find that the continuous monitoring is too costly,
> we might do it based on signals. Also, if there is some key-turn event
> that we must refresh our metrics for, that could also be interesting.
> A sudden spike in the backlog could warrant that.
>
> > Could the FLIP also be used to auto-scale based on state-level metrics
> at an operator level?
>
> It could but we don't want to modify the JobGraph which means we are
> bound to using task-level parallelism. Setting operator level
> parallelism would mean rebuilding the JobGraph which is a tricky thing
> to do. It would increase the solution space but also the complexity of
> finding a stable scaling configuration.
>
> @Zheng:
>
> >After the user opens (advcie), it does not actually perform AutoScaling.
> It only outputs the notification form of tuning suggestions for the user's
> reference.
>
> That's a great idea. Such a "dry run" feature would give users a
> better sense of how the autoscaler would work.
>
> >I found that FFA 2020 Netflix has a related topic discussing the
> automatic tuning function
>
> Thanks! They actually use a similar idea with respect to backlog-based
> scaling. Where their approach differs is that they scale the entire
> job based on a target load instead of scaling vertices individually.
> They have some interesting ideas like the lookup table for past
> scalings and the load prediction based on regressions. I have
> intentionally left out those optimizations but I think they can be
> useful if implemented well.
>
> >Can we provide some interfaces for users to customize and implement some
> tuning algorithms?
>
> Certainly, I think it is critical that we provide a default
> implementation that works well for all kinds of Flink jobs. But users
> should have the option to plug in their own implementation. This will
> especially be useful for new sources which might not have the backlog
> information available as some of the built-in ones like Kafka.
>
> @Dong:
>
> >For example, suppose a Kafka Sink subtask has reached I/O bottleneck
> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> reach 1 sec?
>
> The busyTimeMsPerSecond metric captures all the work of the task
> excluding backpressure time. So a sink's IO work would be included,
> unless it is non-blocking. Typically sinks write to external systems
> which can itself backpressure, and should to avoid the sink to be
> overwhelmed with data. This can be problematic in case the external
> system is the bottleneck and a parallelism increase does not increase
> the write rate. However, Gyula has previously thought about this exact
> case and proposed to stop scaling those tasks by measuring the
> effectiveness of the scaling decision and "giving up" for that vertex
> in case we don't yield the expected result.
>
> >The configuration section does not seem to provide this config. Could you
> specify this?
>
> Thanks! I've added it.
>
> >How are users expected to specify the per-operator configs?
>
> That would only be possible when they create their job like you would
> normally write a Flink job. As soon as the autoscaler takes over, they
> won't be able to change parallelisms. However, it would be conceivable
> that we allow manually fixing some of the operator parallelisms but
> that should be a last resort and not required.
>
> >How often will the Flink Kubernetes operator query metrics from
> JobManager? Is this configurable?
>
> This will be configurable. I think the JobManager only aggregates
> metrics every 5 seconds. I think something like every 30 seconds makes
> sense.
>
> >5) Could you specify the config name and default value for the proposed
> configs? 6) Could you add the name/mbean/type for the proposed metrics?
>
> We will add this soon but feel free to suggest in the wiki page if you
> have ideas.
>
> @Yanfei:
>
> >It's said that "The first step is collecting metrics for all JobVertices
> by combining metrics from all the runtime subtasks and computing the
> *average*". When the load of the subtasks of an operator is not balanced,
> do we need to trigger autoScaling? Has the median or some percentiles been
> considered?
>
> It hasn't but the suggestion is very welcome! There are pros and cons
> for every aggregation method. The median is certainly something to
> look at because, in contrast to the average, it would prevent hot
> partitions from triggering a large scale up.
>
> > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal,
> will we reuse some logic from Reactive Mode?
>
> I wanted to mention it but I left it out intentionally to avoid
> confusion. I think the approach in the reactive mode is very different
> from what we are proposing here. Reactive mode is really not about
> autoscaling but about reacting to node changes in the cluster. We
> specifically want to solve the business logic of autoscaling instead
> of focusing on the execution part. For the first version, we plan to
> do a full redeploy which job vertex parallelism overrides. However, we
> have previously discussed with Chesnay that we could use Flink's old
> /rescaling API which would allow a more graceful scaling of the
> existing cluster. That would certainly be desirable.
>
>
> -Max
>
> On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > @Dong:
> >
> > Looking at the busyTime metrics in the TaskOMetricGroup it seems that
> busy
> > time is actually defined as "not idle or (soft) backpressured" . So I
> think
> > it would give us the correct reading based on what you said about the
> Kafka
> > sink.
> >
> > In any case we have to test this and if something is not as we expect we
> > should fix the metric.
> >
> > Gyula
> >
> > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Thanks for the explanation Gyula. Please see my reply inline.
> > >
> > > BTW, has the proposed solution been deployed and evaluated with any
> > > production workload? If yes, I am wondering if you could share the
> > > experience, e.g. what is the likelihood of having regression and
> > > improvement respectively after enabling this feature.
> > >
> > >
> > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> > >
> > >> Hi Dong!
> > >>
> > >> Let me try to answer the questions :)
> > >>
> > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> > >> spent in the main record processing loop for an operator if I
> > >> understand correctly. This includes IO operations too.
> > >>
> > >
> > > I took a look at the StreamTask::processInput(...)
> > > <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576
> >.
> > > My understanding of this code is that when KafkaSink (same for other
> sinks)
> > > can not write data out to the network fast enough, recordWriter
> > > <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575
> >
> > > becomes unavailable and the StreamTask is considered to be
> > > soft-back-pressured, instead of being considered busy.
> > >
> > > If this is the case, then the algorithm currently proposed in the FLIP
> > > might under-provision the sink operator's parallelism when the sink
> > > operator is the bottleneck. I am not an expert with this piece of code
> > > though. I am wondering if you or someone else could double-check this.
> > >
> > >
> > >> 2: We should add this to the FLIP I agree. It would be a Duration
> config
> > >> with the expected catch up time after rescaling (let's say 5
> minutes). It
> > >> could be computed based on the current data rate and the calculated
> max
> > >> processing rate after the rescale.
> > >>
> > >> Great. I am looking forward to the formula :)
> > >
> > >
> > >> 3: In the current proposal we don't have per operator configs. Target
> > >> utilization would apply to all operators uniformly.
> > >>
> > >> 4: It should be configurable, yes.
> > >>
> > >> 5,6: The names haven't been finalized but I think these are minor
> > >> details. We could add concrete names to the FLIP :)
> > >>
> > >> Sounds good.
> > >
> > >
> > >> Cheers,
> > >> Gyula
> > >>
> > >>
> > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote:
> > >>
> > >>> Hi Max,
> > >>>
> > >>> Thank you for the proposal. The proposal tackles a very important
> issue
> > >>> for Flink users and the design looks promising overall!
> > >>>
> > >>> I have some questions to better understand the proposed public
> > >>> interfaces and the algorithm.
> > >>>
> > >>> 1) The proposal seems to assume that the operator's
> busyTimeMsPerSecond
> > >>> could reach 1 sec. I believe this is mostly true for cpu-bound
> operators.
> > >>> Could you confirm that this can also be true for io-bound operators
> such as
> > >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O
> bottleneck
> > >>> when flushing data out to the Kafka clusters, will
> busyTimeMsPerSecond
> > >>> reach 1 sec?
> > >>>
> > >>> 2) It is said that "users can configure a maximum time to fully
> process
> > >>> the backlog". The configuration section does not seem to provide this
> > >>> config. Could you specify this? And any chance this proposal can
> provide
> > >>> the formula for calculating the new processing rate?
> > >>>
> > >>> 3) How are users expected to specify the per-operator configs (e.g.
> > >>> target utilization)? For example, should users specify it
> programmatically
> > >>> in a DataStream/Table/SQL API?
> > >>>
> > >>> 4) How often will the Flink Kubernetes operator query metrics from
> > >>> JobManager? Is this configurable?
> > >>>
> > >>> 5) Could you specify the config name and default value for the
> proposed
> > >>> configs?
> > >>>
> > >>> 6) Could you add the name/mbean/type for the proposed metrics?
> > >>>
> > >>>
> > >>> Cheers,
> > >>> Dong
> > >>>
> > >>>
> > >>>
>

Reply via email to