I had 2 extra comments to Max's reply:

1. About pre-allocating resources:
This could be done through the operator when the standalone deployment mode
is used relatively easily as there we have better control of pods/resources.

2. Session jobs:
There is a FLIP (
https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
)
to support passing configuration when we submit jobs to the session cluster
through the rest api. Once that goes through, session jobs can also be
scaled in a similar way through the configuration.

Cheers,
Gyula


On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels <m...@apache.org> wrote:

> @Yang
>
> >Since the current auto-scaling needs to fully redeploy the application, it
> may fail to start due to lack of resources.
>
> Great suggestions. I agree that we will have to have to preallocate /
> reserve resources to ensure the rescaling doesn't take longer as expected.
> This is not only a problem when scaling up but also when scaling down
> because any pods surrendered might be taken over by another deployment
> during the rescaling. This would certainly be a case for integrating
> autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> rescaling API. Alternatively, the operator would have to reserve the
> resources somehow.
>
> >Does auto-scaling have a plan to support jobs which are running in a
> session cluster? It might be a different
>
> We are targeting the application deployment mode for the first version but
> the standalone mode can be supported as soon as we have an integration with
> the scheduler.
>
> > # Horizontal scaling V.S. Vertical scaling
>
> True. We left out vertical scaling intentionally. For now we assume CPU /
> memory is set up by the user. While definitely useful, vertical scaling
> adds another dimension to the scaling problem which we wanted to tackle
> later. I'll update the FLIP to explicitly state that.
>
> -Max
>
>
>
> On Tue, Nov 8, 2022 at 3:59 AM Yang Wang <danrtsey...@gmail.com> wrote:
>
> > Thanks for the fruitful discussion and I am really excited to see that
> the
> > auto-scaling really happens for
> >
> > Flink Kubernetes operator. It will be a very important step to make the
> > long-running Flink job more smoothly.
> >
> > I just have some immature ideas and want to share them here.
> >
> >
> > # Resource Reservation
> >
> > Since the current auto-scaling needs to fully redeploy the application,
> it
> > may fail to start due to lack of resources.
> >
> > I know the Kubernetes operator could rollback to the old spec, but we
> still
> > waste a lot of time to make things worse.
> >
> > I hope the FLIP-250[1](support customized K8s scheduler) could help in
> this
> > case.
> >
> >
> > # Session cluster
> >
> > Does auto-scaling have a plan to support jobs which are running in a
> > session cluster? It might be a different
> >
> > story since we could not use Flink config options to override the  job
> > vertex parallelisms. Given that the SessionJob
> >
> > is also a first-class citizen, we need to document the limitation if not
> > support.
> >
> >
> > # Horizontal scaling V.S. Vertical scaling
> >
> > IIUC, the current proposal does not mention vertical scaling. There might
> > be a chance that the memory/cpu of
> >
> > TaskManager is not configured properly. And this will cause unnecessary
> > multiple scaling executions.
> >
> >
> >
> >
> > [1].
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> >
> >
> >
> > Best,
> >
> > Yang
> >
> > Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道:
> >
> > > Thanks for all the interest here and for the great remarks! Gyula
> > > already did a great job addressing the questions here. Let me try to
> > > add additional context:
> > >
> > > @Biao Geng:
> > >
> > > >1.  For source parallelisms, if the user configure a much larger value
> > > than normal, there should be very little pending records though it is
> > > possible to get optimized. But IIUC, in current algorithm, we will not
> > take
> > > actions for this case as the backlog growth rate is almost zero. Is the
> > > understanding right?
> > >
> > > This is actually a corner case which we haven't exactly described in
> > > the FLIP yet. Sources are assumed to only be scaled according to the
> > > backlog but if there is zero backlog, we don't have a number to
> > > compute the parallelism. In this case we tune the source based on the
> > > utilization, just like we do for the other vertices. That could mean
> > > reducing the parallelism in case the source is not doing any work.
> > > Now, in case there is no backlog, we need to be careful that we don't
> > > bounce back to a higher parallelism afterwards.
> > >
> > > >2.  Compared with “scaling out”, “scaling in” is usually more
> dangerous
> > > as it is more likely to lead to negative influence to the downstream
> > jobs.
> > > The min/max load bounds should be useful. I am wondering if it is
> > possible
> > > to have different strategy for “scaling in” to make it more
> conservative.
> > > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > > strategy).
> > >
> > > Gyula already mentioned the bounded scale-down. Additionally, we could
> > > add more conservative utilization targets for scale down. For example,
> > > if we targeted 60% utilization for scale-up, we might target 30%
> > > utilization for scale-down, essentially reducing the parallelism
> > > slower. Same as with the limited parallelism scale-down, in the worst
> > > case this will require multiple scale downs. Ideally, the metrics
> > > should be reliable enough such that we do not require such
> > > workarounds.
> > >
> > > @JunRui Lee:
> > >
> > > >In the document, I didn't find the definition of when to trigger
> > > autoScaling after some jobVertex reach the threshold. If I missed is,
> > > please let me know.
> > >
> > > The triggering is supposed to work based on the number of metric
> > > reports to aggregate and the cool down time. Additionally, there are
> > > boundaries for the target rates such that we don't scale on tiny
> > > deviations of the rates. I agree that we want to prevent unnecessary
> > > scalings as much as possible. We'll expand on that.
> > >
> > > @Pedro Silva:
> > >
> > > >Have you considered making metrics collection getting triggered based
> on
> > > events rather than periodic checks?
> > >
> > > Ideally we want to continuously monitor the job to be able to find
> > > bottlenecks. Based on the metrics, we will decide whether to scale or
> > > not. However, if we find that the continuous monitoring is too costly,
> > > we might do it based on signals. Also, if there is some key-turn event
> > > that we must refresh our metrics for, that could also be interesting.
> > > A sudden spike in the backlog could warrant that.
> > >
> > > > Could the FLIP also be used to auto-scale based on state-level
> metrics
> > > at an operator level?
> > >
> > > It could but we don't want to modify the JobGraph which means we are
> > > bound to using task-level parallelism. Setting operator level
> > > parallelism would mean rebuilding the JobGraph which is a tricky thing
> > > to do. It would increase the solution space but also the complexity of
> > > finding a stable scaling configuration.
> > >
> > > @Zheng:
> > >
> > > >After the user opens (advcie), it does not actually perform
> AutoScaling.
> > > It only outputs the notification form of tuning suggestions for the
> > user's
> > > reference.
> > >
> > > That's a great idea. Such a "dry run" feature would give users a
> > > better sense of how the autoscaler would work.
> > >
> > > >I found that FFA 2020 Netflix has a related topic discussing the
> > > automatic tuning function
> > >
> > > Thanks! They actually use a similar idea with respect to backlog-based
> > > scaling. Where their approach differs is that they scale the entire
> > > job based on a target load instead of scaling vertices individually.
> > > They have some interesting ideas like the lookup table for past
> > > scalings and the load prediction based on regressions. I have
> > > intentionally left out those optimizations but I think they can be
> > > useful if implemented well.
> > >
> > > >Can we provide some interfaces for users to customize and implement
> some
> > > tuning algorithms?
> > >
> > > Certainly, I think it is critical that we provide a default
> > > implementation that works well for all kinds of Flink jobs. But users
> > > should have the option to plug in their own implementation. This will
> > > especially be useful for new sources which might not have the backlog
> > > information available as some of the built-in ones like Kafka.
> > >
> > > @Dong:
> > >
> > > >For example, suppose a Kafka Sink subtask has reached I/O bottleneck
> > > when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> > > reach 1 sec?
> > >
> > > The busyTimeMsPerSecond metric captures all the work of the task
> > > excluding backpressure time. So a sink's IO work would be included,
> > > unless it is non-blocking. Typically sinks write to external systems
> > > which can itself backpressure, and should to avoid the sink to be
> > > overwhelmed with data. This can be problematic in case the external
> > > system is the bottleneck and a parallelism increase does not increase
> > > the write rate. However, Gyula has previously thought about this exact
> > > case and proposed to stop scaling those tasks by measuring the
> > > effectiveness of the scaling decision and "giving up" for that vertex
> > > in case we don't yield the expected result.
> > >
> > > >The configuration section does not seem to provide this config. Could
> > you
> > > specify this?
> > >
> > > Thanks! I've added it.
> > >
> > > >How are users expected to specify the per-operator configs?
> > >
> > > That would only be possible when they create their job like you would
> > > normally write a Flink job. As soon as the autoscaler takes over, they
> > > won't be able to change parallelisms. However, it would be conceivable
> > > that we allow manually fixing some of the operator parallelisms but
> > > that should be a last resort and not required.
> > >
> > > >How often will the Flink Kubernetes operator query metrics from
> > > JobManager? Is this configurable?
> > >
> > > This will be configurable. I think the JobManager only aggregates
> > > metrics every 5 seconds. I think something like every 30 seconds makes
> > > sense.
> > >
> > > >5) Could you specify the config name and default value for the
> proposed
> > > configs? 6) Could you add the name/mbean/type for the proposed metrics?
> > >
> > > We will add this soon but feel free to suggest in the wiki page if you
> > > have ideas.
> > >
> > > @Yanfei:
> > >
> > > >It's said that "The first step is collecting metrics for all
> JobVertices
> > > by combining metrics from all the runtime subtasks and computing the
> > > *average*". When the load of the subtasks of an operator is not
> balanced,
> > > do we need to trigger autoScaling? Has the median or some percentiles
> > been
> > > considered?
> > >
> > > It hasn't but the suggestion is very welcome! There are pros and cons
> > > for every aggregation method. The median is certainly something to
> > > look at because, in contrast to the average, it would prevent hot
> > > partitions from triggering a large scale up.
> > >
> > > > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal,
> > > will we reuse some logic from Reactive Mode?
> > >
> > > I wanted to mention it but I left it out intentionally to avoid
> > > confusion. I think the approach in the reactive mode is very different
> > > from what we are proposing here. Reactive mode is really not about
> > > autoscaling but about reacting to node changes in the cluster. We
> > > specifically want to solve the business logic of autoscaling instead
> > > of focusing on the execution part. For the first version, we plan to
> > > do a full redeploy which job vertex parallelism overrides. However, we
> > > have previously discussed with Chesnay that we could use Flink's old
> > > /rescaling API which would allow a more graceful scaling of the
> > > existing cluster. That would certainly be desirable.
> > >
> > >
> > > -Max
> > >
> > > On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> > > >
> > > > @Dong:
> > > >
> > > > Looking at the busyTime metrics in the TaskOMetricGroup it seems that
> > > busy
> > > > time is actually defined as "not idle or (soft) backpressured" . So I
> > > think
> > > > it would give us the correct reading based on what you said about the
> > > Kafka
> > > > sink.
> > > >
> > > > In any case we have to test this and if something is not as we expect
> > we
> > > > should fix the metric.
> > > >
> > > > Gyula
> > > >
> > > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > > Thanks for the explanation Gyula. Please see my reply inline.
> > > > >
> > > > > BTW, has the proposed solution been deployed and evaluated with any
> > > > > production workload? If yes, I am wondering if you could share the
> > > > > experience, e.g. what is the likelihood of having regression and
> > > > > improvement respectively after enabling this feature.
> > > > >
> > > > >
> > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi Dong!
> > > > >>
> > > > >> Let me try to answer the questions :)
> > > > >>
> > > > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the
> > time
> > > > >> spent in the main record processing loop for an operator if I
> > > > >> understand correctly. This includes IO operations too.
> > > > >>
> > > > >
> > > > > I took a look at the StreamTask::processInput(...)
> > > > > <
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576
> > > >.
> > > > > My understanding of this code is that when KafkaSink (same for
> other
> > > sinks)
> > > > > can not write data out to the network fast enough, recordWriter
> > > > > <
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575
> > > >
> > > > > becomes unavailable and the StreamTask is considered to be
> > > > > soft-back-pressured, instead of being considered busy.
> > > > >
> > > > > If this is the case, then the algorithm currently proposed in the
> > FLIP
> > > > > might under-provision the sink operator's parallelism when the sink
> > > > > operator is the bottleneck. I am not an expert with this piece of
> > code
> > > > > though. I am wondering if you or someone else could double-check
> > this.
> > > > >
> > > > >
> > > > >> 2: We should add this to the FLIP I agree. It would be a Duration
> > > config
> > > > >> with the expected catch up time after rescaling (let's say 5
> > > minutes). It
> > > > >> could be computed based on the current data rate and the
> calculated
> > > max
> > > > >> processing rate after the rescale.
> > > > >>
> > > > >> Great. I am looking forward to the formula :)
> > > > >
> > > > >
> > > > >> 3: In the current proposal we don't have per operator configs.
> > Target
> > > > >> utilization would apply to all operators uniformly.
> > > > >>
> > > > >> 4: It should be configurable, yes.
> > > > >>
> > > > >> 5,6: The names haven't been finalized but I think these are minor
> > > > >> details. We could add concrete names to the FLIP :)
> > > > >>
> > > > >> Sounds good.
> > > > >
> > > > >
> > > > >> Cheers,
> > > > >> Gyula
> > > > >>
> > > > >>
> > > > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >>
> > > > >>> Hi Max,
> > > > >>>
> > > > >>> Thank you for the proposal. The proposal tackles a very important
> > > issue
> > > > >>> for Flink users and the design looks promising overall!
> > > > >>>
> > > > >>> I have some questions to better understand the proposed public
> > > > >>> interfaces and the algorithm.
> > > > >>>
> > > > >>> 1) The proposal seems to assume that the operator's
> > > busyTimeMsPerSecond
> > > > >>> could reach 1 sec. I believe this is mostly true for cpu-bound
> > > operators.
> > > > >>> Could you confirm that this can also be true for io-bound
> operators
> > > such as
> > > > >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O
> > > bottleneck
> > > > >>> when flushing data out to the Kafka clusters, will
> > > busyTimeMsPerSecond
> > > > >>> reach 1 sec?
> > > > >>>
> > > > >>> 2) It is said that "users can configure a maximum time to fully
> > > process
> > > > >>> the backlog". The configuration section does not seem to provide
> > this
> > > > >>> config. Could you specify this? And any chance this proposal can
> > > provide
> > > > >>> the formula for calculating the new processing rate?
> > > > >>>
> > > > >>> 3) How are users expected to specify the per-operator configs
> (e.g.
> > > > >>> target utilization)? For example, should users specify it
> > > programmatically
> > > > >>> in a DataStream/Table/SQL API?
> > > > >>>
> > > > >>> 4) How often will the Flink Kubernetes operator query metrics
> from
> > > > >>> JobManager? Is this configurable?
> > > > >>>
> > > > >>> 5) Could you specify the config name and default value for the
> > > proposed
> > > > >>> configs?
> > > > >>>
> > > > >>> 6) Could you add the name/mbean/type for the proposed metrics?
> > > > >>>
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Dong
> > > > >>>
> > > > >>>
> > > > >>>
> > >
> >
>

Reply via email to