@Yang

>Since the current auto-scaling needs to fully redeploy the application, it
may fail to start due to lack of resources.

Great suggestions. I agree that we will have to have to preallocate /
reserve resources to ensure the rescaling doesn't take longer as expected.
This is not only a problem when scaling up but also when scaling down
because any pods surrendered might be taken over by another deployment
during the rescaling. This would certainly be a case for integrating
autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
rescaling API. Alternatively, the operator would have to reserve the
resources somehow.

>Does auto-scaling have a plan to support jobs which are running in a
session cluster? It might be a different

We are targeting the application deployment mode for the first version but
the standalone mode can be supported as soon as we have an integration with
the scheduler.

> # Horizontal scaling V.S. Vertical scaling

True. We left out vertical scaling intentionally. For now we assume CPU /
memory is set up by the user. While definitely useful, vertical scaling
adds another dimension to the scaling problem which we wanted to tackle
later. I'll update the FLIP to explicitly state that.

-Max



On Tue, Nov 8, 2022 at 3:59 AM Yang Wang <danrtsey...@gmail.com> wrote:

> Thanks for the fruitful discussion and I am really excited to see that the
> auto-scaling really happens for
>
> Flink Kubernetes operator. It will be a very important step to make the
> long-running Flink job more smoothly.
>
> I just have some immature ideas and want to share them here.
>
>
> # Resource Reservation
>
> Since the current auto-scaling needs to fully redeploy the application, it
> may fail to start due to lack of resources.
>
> I know the Kubernetes operator could rollback to the old spec, but we still
> waste a lot of time to make things worse.
>
> I hope the FLIP-250[1](support customized K8s scheduler) could help in this
> case.
>
>
> # Session cluster
>
> Does auto-scaling have a plan to support jobs which are running in a
> session cluster? It might be a different
>
> story since we could not use Flink config options to override the  job
> vertex parallelisms. Given that the SessionJob
>
> is also a first-class citizen, we need to document the limitation if not
> support.
>
>
> # Horizontal scaling V.S. Vertical scaling
>
> IIUC, the current proposal does not mention vertical scaling. There might
> be a chance that the memory/cpu of
>
> TaskManager is not configured properly. And this will cause unnecessary
> multiple scaling executions.
>
>
>
>
> [1].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
>
>
>
> Best,
>
> Yang
>
> Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道:
>
> > Thanks for all the interest here and for the great remarks! Gyula
> > already did a great job addressing the questions here. Let me try to
> > add additional context:
> >
> > @Biao Geng:
> >
> > >1.  For source parallelisms, if the user configure a much larger value
> > than normal, there should be very little pending records though it is
> > possible to get optimized. But IIUC, in current algorithm, we will not
> take
> > actions for this case as the backlog growth rate is almost zero. Is the
> > understanding right?
> >
> > This is actually a corner case which we haven't exactly described in
> > the FLIP yet. Sources are assumed to only be scaled according to the
> > backlog but if there is zero backlog, we don't have a number to
> > compute the parallelism. In this case we tune the source based on the
> > utilization, just like we do for the other vertices. That could mean
> > reducing the parallelism in case the source is not doing any work.
> > Now, in case there is no backlog, we need to be careful that we don't
> > bounce back to a higher parallelism afterwards.
> >
> > >2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> > as it is more likely to lead to negative influence to the downstream
> jobs.
> > The min/max load bounds should be useful. I am wondering if it is
> possible
> > to have different strategy for “scaling in” to make it more conservative.
> > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > strategy).
> >
> > Gyula already mentioned the bounded scale-down. Additionally, we could
> > add more conservative utilization targets for scale down. For example,
> > if we targeted 60% utilization for scale-up, we might target 30%
> > utilization for scale-down, essentially reducing the parallelism
> > slower. Same as with the limited parallelism scale-down, in the worst
> > case this will require multiple scale downs. Ideally, the metrics
> > should be reliable enough such that we do not require such
> > workarounds.
> >
> > @JunRui Lee:
> >
> > >In the document, I didn't find the definition of when to trigger
> > autoScaling after some jobVertex reach the threshold. If I missed is,
> > please let me know.
> >
> > The triggering is supposed to work based on the number of metric
> > reports to aggregate and the cool down time. Additionally, there are
> > boundaries for the target rates such that we don't scale on tiny
> > deviations of the rates. I agree that we want to prevent unnecessary
> > scalings as much as possible. We'll expand on that.
> >
> > @Pedro Silva:
> >
> > >Have you considered making metrics collection getting triggered based on
> > events rather than periodic checks?
> >
> > Ideally we want to continuously monitor the job to be able to find
> > bottlenecks. Based on the metrics, we will decide whether to scale or
> > not. However, if we find that the continuous monitoring is too costly,
> > we might do it based on signals. Also, if there is some key-turn event
> > that we must refresh our metrics for, that could also be interesting.
> > A sudden spike in the backlog could warrant that.
> >
> > > Could the FLIP also be used to auto-scale based on state-level metrics
> > at an operator level?
> >
> > It could but we don't want to modify the JobGraph which means we are
> > bound to using task-level parallelism. Setting operator level
> > parallelism would mean rebuilding the JobGraph which is a tricky thing
> > to do. It would increase the solution space but also the complexity of
> > finding a stable scaling configuration.
> >
> > @Zheng:
> >
> > >After the user opens (advcie), it does not actually perform AutoScaling.
> > It only outputs the notification form of tuning suggestions for the
> user's
> > reference.
> >
> > That's a great idea. Such a "dry run" feature would give users a
> > better sense of how the autoscaler would work.
> >
> > >I found that FFA 2020 Netflix has a related topic discussing the
> > automatic tuning function
> >
> > Thanks! They actually use a similar idea with respect to backlog-based
> > scaling. Where their approach differs is that they scale the entire
> > job based on a target load instead of scaling vertices individually.
> > They have some interesting ideas like the lookup table for past
> > scalings and the load prediction based on regressions. I have
> > intentionally left out those optimizations but I think they can be
> > useful if implemented well.
> >
> > >Can we provide some interfaces for users to customize and implement some
> > tuning algorithms?
> >
> > Certainly, I think it is critical that we provide a default
> > implementation that works well for all kinds of Flink jobs. But users
> > should have the option to plug in their own implementation. This will
> > especially be useful for new sources which might not have the backlog
> > information available as some of the built-in ones like Kafka.
> >
> > @Dong:
> >
> > >For example, suppose a Kafka Sink subtask has reached I/O bottleneck
> > when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> > reach 1 sec?
> >
> > The busyTimeMsPerSecond metric captures all the work of the task
> > excluding backpressure time. So a sink's IO work would be included,
> > unless it is non-blocking. Typically sinks write to external systems
> > which can itself backpressure, and should to avoid the sink to be
> > overwhelmed with data. This can be problematic in case the external
> > system is the bottleneck and a parallelism increase does not increase
> > the write rate. However, Gyula has previously thought about this exact
> > case and proposed to stop scaling those tasks by measuring the
> > effectiveness of the scaling decision and "giving up" for that vertex
> > in case we don't yield the expected result.
> >
> > >The configuration section does not seem to provide this config. Could
> you
> > specify this?
> >
> > Thanks! I've added it.
> >
> > >How are users expected to specify the per-operator configs?
> >
> > That would only be possible when they create their job like you would
> > normally write a Flink job. As soon as the autoscaler takes over, they
> > won't be able to change parallelisms. However, it would be conceivable
> > that we allow manually fixing some of the operator parallelisms but
> > that should be a last resort and not required.
> >
> > >How often will the Flink Kubernetes operator query metrics from
> > JobManager? Is this configurable?
> >
> > This will be configurable. I think the JobManager only aggregates
> > metrics every 5 seconds. I think something like every 30 seconds makes
> > sense.
> >
> > >5) Could you specify the config name and default value for the proposed
> > configs? 6) Could you add the name/mbean/type for the proposed metrics?
> >
> > We will add this soon but feel free to suggest in the wiki page if you
> > have ideas.
> >
> > @Yanfei:
> >
> > >It's said that "The first step is collecting metrics for all JobVertices
> > by combining metrics from all the runtime subtasks and computing the
> > *average*". When the load of the subtasks of an operator is not balanced,
> > do we need to trigger autoScaling? Has the median or some percentiles
> been
> > considered?
> >
> > It hasn't but the suggestion is very welcome! There are pros and cons
> > for every aggregation method. The median is certainly something to
> > look at because, in contrast to the average, it would prevent hot
> > partitions from triggering a large scale up.
> >
> > > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal,
> > will we reuse some logic from Reactive Mode?
> >
> > I wanted to mention it but I left it out intentionally to avoid
> > confusion. I think the approach in the reactive mode is very different
> > from what we are proposing here. Reactive mode is really not about
> > autoscaling but about reacting to node changes in the cluster. We
> > specifically want to solve the business logic of autoscaling instead
> > of focusing on the execution part. For the first version, we plan to
> > do a full redeploy which job vertex parallelism overrides. However, we
> > have previously discussed with Chesnay that we could use Flink's old
> > /rescaling API which would allow a more graceful scaling of the
> > existing cluster. That would certainly be desirable.
> >
> >
> > -Max
> >
> > On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
> > >
> > > @Dong:
> > >
> > > Looking at the busyTime metrics in the TaskOMetricGroup it seems that
> > busy
> > > time is actually defined as "not idle or (soft) backpressured" . So I
> > think
> > > it would give us the correct reading based on what you said about the
> > Kafka
> > > sink.
> > >
> > > In any case we have to test this and if something is not as we expect
> we
> > > should fix the metric.
> > >
> > > Gyula
> > >
> > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Thanks for the explanation Gyula. Please see my reply inline.
> > > >
> > > > BTW, has the proposed solution been deployed and evaluated with any
> > > > production workload? If yes, I am wondering if you could share the
> > > > experience, e.g. what is the likelihood of having regression and
> > > > improvement respectively after enabling this feature.
> > > >
> > > >
> > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Dong!
> > > >>
> > > >> Let me try to answer the questions :)
> > > >>
> > > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the
> time
> > > >> spent in the main record processing loop for an operator if I
> > > >> understand correctly. This includes IO operations too.
> > > >>
> > > >
> > > > I took a look at the StreamTask::processInput(...)
> > > > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576
> > >.
> > > > My understanding of this code is that when KafkaSink (same for other
> > sinks)
> > > > can not write data out to the network fast enough, recordWriter
> > > > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575
> > >
> > > > becomes unavailable and the StreamTask is considered to be
> > > > soft-back-pressured, instead of being considered busy.
> > > >
> > > > If this is the case, then the algorithm currently proposed in the
> FLIP
> > > > might under-provision the sink operator's parallelism when the sink
> > > > operator is the bottleneck. I am not an expert with this piece of
> code
> > > > though. I am wondering if you or someone else could double-check
> this.
> > > >
> > > >
> > > >> 2: We should add this to the FLIP I agree. It would be a Duration
> > config
> > > >> with the expected catch up time after rescaling (let's say 5
> > minutes). It
> > > >> could be computed based on the current data rate and the calculated
> > max
> > > >> processing rate after the rescale.
> > > >>
> > > >> Great. I am looking forward to the formula :)
> > > >
> > > >
> > > >> 3: In the current proposal we don't have per operator configs.
> Target
> > > >> utilization would apply to all operators uniformly.
> > > >>
> > > >> 4: It should be configurable, yes.
> > > >>
> > > >> 5,6: The names haven't been finalized but I think these are minor
> > > >> details. We could add concrete names to the FLIP :)
> > > >>
> > > >> Sounds good.
> > > >
> > > >
> > > >> Cheers,
> > > >> Gyula
> > > >>
> > > >>
> > > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > >>
> > > >>> Hi Max,
> > > >>>
> > > >>> Thank you for the proposal. The proposal tackles a very important
> > issue
> > > >>> for Flink users and the design looks promising overall!
> > > >>>
> > > >>> I have some questions to better understand the proposed public
> > > >>> interfaces and the algorithm.
> > > >>>
> > > >>> 1) The proposal seems to assume that the operator's
> > busyTimeMsPerSecond
> > > >>> could reach 1 sec. I believe this is mostly true for cpu-bound
> > operators.
> > > >>> Could you confirm that this can also be true for io-bound operators
> > such as
> > > >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O
> > bottleneck
> > > >>> when flushing data out to the Kafka clusters, will
> > busyTimeMsPerSecond
> > > >>> reach 1 sec?
> > > >>>
> > > >>> 2) It is said that "users can configure a maximum time to fully
> > process
> > > >>> the backlog". The configuration section does not seem to provide
> this
> > > >>> config. Could you specify this? And any chance this proposal can
> > provide
> > > >>> the formula for calculating the new processing rate?
> > > >>>
> > > >>> 3) How are users expected to specify the per-operator configs (e.g.
> > > >>> target utilization)? For example, should users specify it
> > programmatically
> > > >>> in a DataStream/Table/SQL API?
> > > >>>
> > > >>> 4) How often will the Flink Kubernetes operator query metrics from
> > > >>> JobManager? Is this configurable?
> > > >>>
> > > >>> 5) Could you specify the config name and default value for the
> > proposed
> > > >>> configs?
> > > >>>
> > > >>> 6) Could you add the name/mbean/type for the proposed metrics?
> > > >>>
> > > >>>
> > > >>> Cheers,
> > > >>> Dong
> > > >>>
> > > >>>
> > > >>>
> >
>

Reply via email to