+1 If there are no further comments, I'll start a vote thread in the next
few days.

-Max


On Tue, Nov 15, 2022 at 2:06 PM Zheng Yu Chen <jam.gz...@gmail.com> wrote:

> @Gyula  Have a  good news, now flip-256 now is finish and merge it .
>  flip-271 discussion seems to have stopped and I wonder if there are any
> other comments. Can we get to the polls and start this exciting feature 😀
> Maybe I can get involved in developing this feature
>
>
>
> Gyula Fóra <gyula.f...@gmail.com> 于2022年11月8日周二 18:46写道:
>
> > I had 2 extra comments to Max's reply:
> >
> > 1. About pre-allocating resources:
> > This could be done through the operator when the standalone deployment
> mode
> > is used relatively easily as there we have better control of
> > pods/resources.
> >
> > 2. Session jobs:
> > There is a FLIP (
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> > )
> > to support passing configuration when we submit jobs to the session
> cluster
> > through the rest api. Once that goes through, session jobs can also be
> > scaled in a similar way through the configuration.
> >
> > Cheers,
> > Gyula
> >
> >
> > On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels <m...@apache.org>
> wrote:
> >
> > > @Yang
> > >
> > > >Since the current auto-scaling needs to fully redeploy the
> application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > Great suggestions. I agree that we will have to have to preallocate /
> > > reserve resources to ensure the rescaling doesn't take longer as
> > expected.
> > > This is not only a problem when scaling up but also when scaling down
> > > because any pods surrendered might be taken over by another deployment
> > > during the rescaling. This would certainly be a case for integrating
> > > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > > rescaling API. Alternatively, the operator would have to reserve the
> > > resources somehow.
> > >
> > > >Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > We are targeting the application deployment mode for the first version
> > but
> > > the standalone mode can be supported as soon as we have an integration
> > with
> > > the scheduler.
> > >
> > > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > True. We left out vertical scaling intentionally. For now we assume
> CPU /
> > > memory is set up by the user. While definitely useful, vertical scaling
> > > adds another dimension to the scaling problem which we wanted to tackle
> > > later. I'll update the FLIP to explicitly state that.
> > >
> > > -Max
> > >
> > >
> > >
> > > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang <danrtsey...@gmail.com>
> wrote:
> > >
> > > > Thanks for the fruitful discussion and I am really excited to see
> that
> > > the
> > > > auto-scaling really happens for
> > > >
> > > > Flink Kubernetes operator. It will be a very important step to make
> the
> > > > long-running Flink job more smoothly.
> > > >
> > > > I just have some immature ideas and want to share them here.
> > > >
> > > >
> > > > # Resource Reservation
> > > >
> > > > Since the current auto-scaling needs to fully redeploy the
> application,
> > > it
> > > > may fail to start due to lack of resources.
> > > >
> > > > I know the Kubernetes operator could rollback to the old spec, but we
> > > still
> > > > waste a lot of time to make things worse.
> > > >
> > > > I hope the FLIP-250[1](support customized K8s scheduler) could help
> in
> > > this
> > > > case.
> > > >
> > > >
> > > > # Session cluster
> > > >
> > > > Does auto-scaling have a plan to support jobs which are running in a
> > > > session cluster? It might be a different
> > > >
> > > > story since we could not use Flink config options to override the
> job
> > > > vertex parallelisms. Given that the SessionJob
> > > >
> > > > is also a first-class citizen, we need to document the limitation if
> > not
> > > > support.
> > > >
> > > >
> > > > # Horizontal scaling V.S. Vertical scaling
> > > >
> > > > IIUC, the current proposal does not mention vertical scaling. There
> > might
> > > > be a chance that the memory/cpu of
> > > >
> > > > TaskManager is not configured properly. And this will cause
> unnecessary
> > > > multiple scaling executions.
> > > >
> > > >
> > > >
> > > >
> > > > [1].
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > > >
> > > >
> > > >
> > > > Best,
> > > >
> > > > Yang
> > > >
> > > > Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道:
> > > >
> > > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > > already did a great job addressing the questions here. Let me try
> to
> > > > > add additional context:
> > > > >
> > > > > @Biao Geng:
> > > > >
> > > > > >1.  For source parallelisms, if the user configure a much larger
> > value
> > > > > than normal, there should be very little pending records though it
> is
> > > > > possible to get optimized. But IIUC, in current algorithm, we will
> > not
> > > > take
> > > > > actions for this case as the backlog growth rate is almost zero. Is
> > the
> > > > > understanding right?
> > > > >
> > > > > This is actually a corner case which we haven't exactly described
> in
> > > > > the FLIP yet. Sources are assumed to only be scaled according to
> the
> > > > > backlog but if there is zero backlog, we don't have a number to
> > > > > compute the parallelism. In this case we tune the source based on
> the
> > > > > utilization, just like we do for the other vertices. That could
> mean
> > > > > reducing the parallelism in case the source is not doing any work.
> > > > > Now, in case there is no backlog, we need to be careful that we
> don't
> > > > > bounce back to a higher parallelism afterwards.
> > > > >
> > > > > >2.  Compared with “scaling out”, “scaling in” is usually more
> > > dangerous
> > > > > as it is more likely to lead to negative influence to the
> downstream
> > > > jobs.
> > > > > The min/max load bounds should be useful. I am wondering if it is
> > > > possible
> > > > > to have different strategy for “scaling in” to make it more
> > > conservative.
> > > > > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > > > > strategy).
> > > > >
> > > > > Gyula already mentioned the bounded scale-down. Additionally, we
> > could
> > > > > add more conservative utilization targets for scale down. For
> > example,
> > > > > if we targeted 60% utilization for scale-up, we might target 30%
> > > > > utilization for scale-down, essentially reducing the parallelism
> > > > > slower. Same as with the limited parallelism scale-down, in the
> worst
> > > > > case this will require multiple scale downs. Ideally, the metrics
> > > > > should be reliable enough such that we do not require such
> > > > > workarounds.
> > > > >
> > > > > @JunRui Lee:
> > > > >
> > > > > >In the document, I didn't find the definition of when to trigger
> > > > > autoScaling after some jobVertex reach the threshold. If I missed
> is,
> > > > > please let me know.
> > > > >
> > > > > The triggering is supposed to work based on the number of metric
> > > > > reports to aggregate and the cool down time. Additionally, there
> are
> > > > > boundaries for the target rates such that we don't scale on tiny
> > > > > deviations of the rates. I agree that we want to prevent
> unnecessary
> > > > > scalings as much as possible. We'll expand on that.
> > > > >
> > > > > @Pedro Silva:
> > > > >
> > > > > >Have you considered making metrics collection getting triggered
> > based
> > > on
> > > > > events rather than periodic checks?
> > > > >
> > > > > Ideally we want to continuously monitor the job to be able to find
> > > > > bottlenecks. Based on the metrics, we will decide whether to scale
> or
> > > > > not. However, if we find that the continuous monitoring is too
> > costly,
> > > > > we might do it based on signals. Also, if there is some key-turn
> > event
> > > > > that we must refresh our metrics for, that could also be
> interesting.
> > > > > A sudden spike in the backlog could warrant that.
> > > > >
> > > > > > Could the FLIP also be used to auto-scale based on state-level
> > > metrics
> > > > > at an operator level?
> > > > >
> > > > > It could but we don't want to modify the JobGraph which means we
> are
> > > > > bound to using task-level parallelism. Setting operator level
> > > > > parallelism would mean rebuilding the JobGraph which is a tricky
> > thing
> > > > > to do. It would increase the solution space but also the complexity
> > of
> > > > > finding a stable scaling configuration.
> > > > >
> > > > > @Zheng:
> > > > >
> > > > > >After the user opens (advcie), it does not actually perform
> > > AutoScaling.
> > > > > It only outputs the notification form of tuning suggestions for the
> > > > user's
> > > > > reference.
> > > > >
> > > > > That's a great idea. Such a "dry run" feature would give users a
> > > > > better sense of how the autoscaler would work.
> > > > >
> > > > > >I found that FFA 2020 Netflix has a related topic discussing the
> > > > > automatic tuning function
> > > > >
> > > > > Thanks! They actually use a similar idea with respect to
> > backlog-based
> > > > > scaling. Where their approach differs is that they scale the entire
> > > > > job based on a target load instead of scaling vertices
> individually.
> > > > > They have some interesting ideas like the lookup table for past
> > > > > scalings and the load prediction based on regressions. I have
> > > > > intentionally left out those optimizations but I think they can be
> > > > > useful if implemented well.
> > > > >
> > > > > >Can we provide some interfaces for users to customize and
> implement
> > > some
> > > > > tuning algorithms?
> > > > >
> > > > > Certainly, I think it is critical that we provide a default
> > > > > implementation that works well for all kinds of Flink jobs. But
> users
> > > > > should have the option to plug in their own implementation. This
> will
> > > > > especially be useful for new sources which might not have the
> backlog
> > > > > information available as some of the built-in ones like Kafka.
> > > > >
> > > > > @Dong:
> > > > >
> > > > > >For example, suppose a Kafka Sink subtask has reached I/O
> bottleneck
> > > > > when flushing data out to the Kafka clusters, will
> > busyTimeMsPerSecond
> > > > > reach 1 sec?
> > > > >
> > > > > The busyTimeMsPerSecond metric captures all the work of the task
> > > > > excluding backpressure time. So a sink's IO work would be included,
> > > > > unless it is non-blocking. Typically sinks write to external
> systems
> > > > > which can itself backpressure, and should to avoid the sink to be
> > > > > overwhelmed with data. This can be problematic in case the external
> > > > > system is the bottleneck and a parallelism increase does not
> increase
> > > > > the write rate. However, Gyula has previously thought about this
> > exact
> > > > > case and proposed to stop scaling those tasks by measuring the
> > > > > effectiveness of the scaling decision and "giving up" for that
> vertex
> > > > > in case we don't yield the expected result.
> > > > >
> > > > > >The configuration section does not seem to provide this config.
> > Could
> > > > you
> > > > > specify this?
> > > > >
> > > > > Thanks! I've added it.
> > > > >
> > > > > >How are users expected to specify the per-operator configs?
> > > > >
> > > > > That would only be possible when they create their job like you
> would
> > > > > normally write a Flink job. As soon as the autoscaler takes over,
> > they
> > > > > won't be able to change parallelisms. However, it would be
> > conceivable
> > > > > that we allow manually fixing some of the operator parallelisms but
> > > > > that should be a last resort and not required.
> > > > >
> > > > > >How often will the Flink Kubernetes operator query metrics from
> > > > > JobManager? Is this configurable?
> > > > >
> > > > > This will be configurable. I think the JobManager only aggregates
> > > > > metrics every 5 seconds. I think something like every 30 seconds
> > makes
> > > > > sense.
> > > > >
> > > > > >5) Could you specify the config name and default value for the
> > > proposed
> > > > > configs? 6) Could you add the name/mbean/type for the proposed
> > metrics?
> > > > >
> > > > > We will add this soon but feel free to suggest in the wiki page if
> > you
> > > > > have ideas.
> > > > >
> > > > > @Yanfei:
> > > > >
> > > > > >It's said that "The first step is collecting metrics for all
> > > JobVertices
> > > > > by combining metrics from all the runtime subtasks and computing
> the
> > > > > *average*". When the load of the subtasks of an operator is not
> > > balanced,
> > > > > do we need to trigger autoScaling? Has the median or some
> percentiles
> > > > been
> > > > > considered?
> > > > >
> > > > > It hasn't but the suggestion is very welcome! There are pros and
> cons
> > > > > for every aggregation method. The median is certainly something to
> > > > > look at because, in contrast to the average, it would prevent hot
> > > > > partitions from triggering a large scale up.
> > > > >
> > > > > > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this
> > proposal,
> > > > > will we reuse some logic from Reactive Mode?
> > > > >
> > > > > I wanted to mention it but I left it out intentionally to avoid
> > > > > confusion. I think the approach in the reactive mode is very
> > different
> > > > > from what we are proposing here. Reactive mode is really not about
> > > > > autoscaling but about reacting to node changes in the cluster. We
> > > > > specifically want to solve the business logic of autoscaling
> instead
> > > > > of focusing on the execution part. For the first version, we plan
> to
> > > > > do a full redeploy which job vertex parallelism overrides. However,
> > we
> > > > > have previously discussed with Chesnay that we could use Flink's
> old
> > > > > /rescaling API which would allow a more graceful scaling of the
> > > > > existing cluster. That would certainly be desirable.
> > > > >
> > > > >
> > > > > -Max
> > > > >
> > > > > On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > @Dong:
> > > > > >
> > > > > > Looking at the busyTime metrics in the TaskOMetricGroup it seems
> > that
> > > > > busy
> > > > > > time is actually defined as "not idle or (soft) backpressured" .
> > So I
> > > > > think
> > > > > > it would give us the correct reading based on what you said about
> > the
> > > > > Kafka
> > > > > > sink.
> > > > > >
> > > > > > In any case we have to test this and if something is not as we
> > expect
> > > > we
> > > > > > should fix the metric.
> > > > > >
> > > > > > Gyula
> > > > > >
> > > > > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Thanks for the explanation Gyula. Please see my reply inline.
> > > > > > >
> > > > > > > BTW, has the proposed solution been deployed and evaluated with
> > any
> > > > > > > production workload? If yes, I am wondering if you could share
> > the
> > > > > > > experience, e.g. what is the likelihood of having regression
> and
> > > > > > > improvement respectively after enabling this feature.
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <
> gyula.f...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi Dong!
> > > > > > >>
> > > > > > >> Let me try to answer the questions :)
> > > > > > >>
> > > > > > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures
> the
> > > > time
> > > > > > >> spent in the main record processing loop for an operator if I
> > > > > > >> understand correctly. This includes IO operations too.
> > > > > > >>
> > > > > > >
> > > > > > > I took a look at the StreamTask::processInput(...)
> > > > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576
> > > > > >.
> > > > > > > My understanding of this code is that when KafkaSink (same for
> > > other
> > > > > sinks)
> > > > > > > can not write data out to the network fast enough, recordWriter
> > > > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575
> > > > > >
> > > > > > > becomes unavailable and the StreamTask is considered to be
> > > > > > > soft-back-pressured, instead of being considered busy.
> > > > > > >
> > > > > > > If this is the case, then the algorithm currently proposed in
> the
> > > > FLIP
> > > > > > > might under-provision the sink operator's parallelism when the
> > sink
> > > > > > > operator is the bottleneck. I am not an expert with this piece
> of
> > > > code
> > > > > > > though. I am wondering if you or someone else could
> double-check
> > > > this.
> > > > > > >
> > > > > > >
> > > > > > >> 2: We should add this to the FLIP I agree. It would be a
> > Duration
> > > > > config
> > > > > > >> with the expected catch up time after rescaling (let's say 5
> > > > > minutes). It
> > > > > > >> could be computed based on the current data rate and the
> > > calculated
> > > > > max
> > > > > > >> processing rate after the rescale.
> > > > > > >>
> > > > > > >> Great. I am looking forward to the formula :)
> > > > > > >
> > > > > > >
> > > > > > >> 3: In the current proposal we don't have per operator configs.
> > > > Target
> > > > > > >> utilization would apply to all operators uniformly.
> > > > > > >>
> > > > > > >> 4: It should be configurable, yes.
> > > > > > >>
> > > > > > >> 5,6: The names haven't been finalized but I think these are
> > minor
> > > > > > >> details. We could add concrete names to the FLIP :)
> > > > > > >>
> > > > > > >> Sounds good.
> > > > > > >
> > > > > > >
> > > > > > >> Cheers,
> > > > > > >> Gyula
> > > > > > >>
> > > > > > >>
> > > > > > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > > >>
> > > > > > >>> Hi Max,
> > > > > > >>>
> > > > > > >>> Thank you for the proposal. The proposal tackles a very
> > important
> > > > > issue
> > > > > > >>> for Flink users and the design looks promising overall!
> > > > > > >>>
> > > > > > >>> I have some questions to better understand the proposed
> public
> > > > > > >>> interfaces and the algorithm.
> > > > > > >>>
> > > > > > >>> 1) The proposal seems to assume that the operator's
> > > > > busyTimeMsPerSecond
> > > > > > >>> could reach 1 sec. I believe this is mostly true for
> cpu-bound
> > > > > operators.
> > > > > > >>> Could you confirm that this can also be true for io-bound
> > > operators
> > > > > such as
> > > > > > >>> sinks? For example, suppose a Kafka Sink subtask has reached
> > I/O
> > > > > bottleneck
> > > > > > >>> when flushing data out to the Kafka clusters, will
> > > > > busyTimeMsPerSecond
> > > > > > >>> reach 1 sec?
> > > > > > >>>
> > > > > > >>> 2) It is said that "users can configure a maximum time to
> fully
> > > > > process
> > > > > > >>> the backlog". The configuration section does not seem to
> > provide
> > > > this
> > > > > > >>> config. Could you specify this? And any chance this proposal
> > can
> > > > > provide
> > > > > > >>> the formula for calculating the new processing rate?
> > > > > > >>>
> > > > > > >>> 3) How are users expected to specify the per-operator configs
> > > (e.g.
> > > > > > >>> target utilization)? For example, should users specify it
> > > > > programmatically
> > > > > > >>> in a DataStream/Table/SQL API?
> > > > > > >>>
> > > > > > >>> 4) How often will the Flink Kubernetes operator query metrics
> > > from
> > > > > > >>> JobManager? Is this configurable?
> > > > > > >>>
> > > > > > >>> 5) Could you specify the config name and default value for
> the
> > > > > proposed
> > > > > > >>> configs?
> > > > > > >>>
> > > > > > >>> 6) Could you add the name/mbean/type for the proposed
> metrics?
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> Cheers,
> > > > > > >>> Dong
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > >
> > > >
> > >
> >
>
>
> --
> Best
>
> ConradJam
>

Reply via email to