Thanks for all the interest here and for the great remarks! Gyula
already did a great job addressing the questions here. Let me try to
add additional context:

@Biao Geng:

>1.  For source parallelisms, if the user configure a much larger value than 
>normal, there should be very little pending records though it is possible to 
>get optimized. But IIUC, in current algorithm, we will not take actions for 
>this case as the backlog growth rate is almost zero. Is the understanding 
>right?

This is actually a corner case which we haven't exactly described in
the FLIP yet. Sources are assumed to only be scaled according to the
backlog but if there is zero backlog, we don't have a number to
compute the parallelism. In this case we tune the source based on the
utilization, just like we do for the other vertices. That could mean
reducing the parallelism in case the source is not doing any work.
Now, in case there is no backlog, we need to be careful that we don't
bounce back to a higher parallelism afterwards.

>2.  Compared with “scaling out”, “scaling in” is usually more dangerous as it 
>is more likely to lead to negative influence to the downstream jobs. The 
>min/max load bounds should be useful. I am wondering if it is possible to have 
>different strategy for “scaling in” to make it more conservative. Or more 
>eagerly, allow custom autoscaling strategy(e.g. time-based strategy).

Gyula already mentioned the bounded scale-down. Additionally, we could
add more conservative utilization targets for scale down. For example,
if we targeted 60% utilization for scale-up, we might target 30%
utilization for scale-down, essentially reducing the parallelism
slower. Same as with the limited parallelism scale-down, in the worst
case this will require multiple scale downs. Ideally, the metrics
should be reliable enough such that we do not require such
workarounds.

@JunRui Lee:

>In the document, I didn't find the definition of when to trigger autoScaling 
>after some jobVertex reach the threshold. If I missed is, please let me know.

The triggering is supposed to work based on the number of metric
reports to aggregate and the cool down time. Additionally, there are
boundaries for the target rates such that we don't scale on tiny
deviations of the rates. I agree that we want to prevent unnecessary
scalings as much as possible. We'll expand on that.

@Pedro Silva:

>Have you considered making metrics collection getting triggered based on 
>events rather than periodic checks?

Ideally we want to continuously monitor the job to be able to find
bottlenecks. Based on the metrics, we will decide whether to scale or
not. However, if we find that the continuous monitoring is too costly,
we might do it based on signals. Also, if there is some key-turn event
that we must refresh our metrics for, that could also be interesting.
A sudden spike in the backlog could warrant that.

> Could the FLIP also be used to auto-scale based on state-level metrics at an 
> operator level?

It could but we don't want to modify the JobGraph which means we are
bound to using task-level parallelism. Setting operator level
parallelism would mean rebuilding the JobGraph which is a tricky thing
to do. It would increase the solution space but also the complexity of
finding a stable scaling configuration.

@Zheng:

>After the user opens (advcie), it does not actually perform AutoScaling. It 
>only outputs the notification form of tuning suggestions for the user's 
>reference.

That's a great idea. Such a "dry run" feature would give users a
better sense of how the autoscaler would work.

>I found that FFA 2020 Netflix has a related topic discussing the automatic 
>tuning function

Thanks! They actually use a similar idea with respect to backlog-based
scaling. Where their approach differs is that they scale the entire
job based on a target load instead of scaling vertices individually.
They have some interesting ideas like the lookup table for past
scalings and the load prediction based on regressions. I have
intentionally left out those optimizations but I think they can be
useful if implemented well.

>Can we provide some interfaces for users to customize and implement some 
>tuning algorithms?

Certainly, I think it is critical that we provide a default
implementation that works well for all kinds of Flink jobs. But users
should have the option to plug in their own implementation. This will
especially be useful for new sources which might not have the backlog
information available as some of the built-in ones like Kafka.

@Dong:

>For example, suppose a Kafka Sink subtask has reached I/O bottleneck
when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
reach 1 sec?

The busyTimeMsPerSecond metric captures all the work of the task
excluding backpressure time. So a sink's IO work would be included,
unless it is non-blocking. Typically sinks write to external systems
which can itself backpressure, and should to avoid the sink to be
overwhelmed with data. This can be problematic in case the external
system is the bottleneck and a parallelism increase does not increase
the write rate. However, Gyula has previously thought about this exact
case and proposed to stop scaling those tasks by measuring the
effectiveness of the scaling decision and "giving up" for that vertex
in case we don't yield the expected result.

>The configuration section does not seem to provide this config. Could you 
>specify this?

Thanks! I've added it.

>How are users expected to specify the per-operator configs?

That would only be possible when they create their job like you would
normally write a Flink job. As soon as the autoscaler takes over, they
won't be able to change parallelisms. However, it would be conceivable
that we allow manually fixing some of the operator parallelisms but
that should be a last resort and not required.

>How often will the Flink Kubernetes operator query metrics from JobManager? Is 
>this configurable?

This will be configurable. I think the JobManager only aggregates
metrics every 5 seconds. I think something like every 30 seconds makes
sense.

>5) Could you specify the config name and default value for the proposed 
>configs? 6) Could you add the name/mbean/type for the proposed metrics?

We will add this soon but feel free to suggest in the wiki page if you
have ideas.

@Yanfei:

>It's said that "The first step is collecting metrics for all JobVertices by 
>combining metrics from all the runtime subtasks and computing the *average*". 
>When the load of the subtasks of an operator is not balanced, do we need to 
>trigger autoScaling? Has the median or some percentiles been considered?

It hasn't but the suggestion is very welcome! There are pros and cons
for every aggregation method. The median is certainly something to
look at because, in contrast to the average, it would prevent hot
partitions from triggering a large scale up.

> IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal, will we 
> reuse some logic from Reactive Mode?

I wanted to mention it but I left it out intentionally to avoid
confusion. I think the approach in the reactive mode is very different
from what we are proposing here. Reactive mode is really not about
autoscaling but about reacting to node changes in the cluster. We
specifically want to solve the business logic of autoscaling instead
of focusing on the execution part. For the first version, we plan to
do a full redeploy which job vertex parallelism overrides. However, we
have previously discussed with Chesnay that we could use Flink's old
/rescaling API which would allow a more graceful scaling of the
existing cluster. That would certainly be desirable.


-Max

On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> @Dong:
>
> Looking at the busyTime metrics in the TaskOMetricGroup it seems that busy
> time is actually defined as "not idle or (soft) backpressured" . So I think
> it would give us the correct reading based on what you said about the Kafka
> sink.
>
> In any case we have to test this and if something is not as we expect we
> should fix the metric.
>
> Gyula
>
> On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Thanks for the explanation Gyula. Please see my reply inline.
> >
> > BTW, has the proposed solution been deployed and evaluated with any
> > production workload? If yes, I am wondering if you could share the
> > experience, e.g. what is the likelihood of having regression and
> > improvement respectively after enabling this feature.
> >
> >
> > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> >> Hi Dong!
> >>
> >> Let me try to answer the questions :)
> >>
> >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> >> spent in the main record processing loop for an operator if I
> >> understand correctly. This includes IO operations too.
> >>
> >
> > I took a look at the StreamTask::processInput(...)
> > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576>.
> > My understanding of this code is that when KafkaSink (same for other sinks)
> > can not write data out to the network fast enough, recordWriter
> > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575>
> > becomes unavailable and the StreamTask is considered to be
> > soft-back-pressured, instead of being considered busy.
> >
> > If this is the case, then the algorithm currently proposed in the FLIP
> > might under-provision the sink operator's parallelism when the sink
> > operator is the bottleneck. I am not an expert with this piece of code
> > though. I am wondering if you or someone else could double-check this.
> >
> >
> >> 2: We should add this to the FLIP I agree. It would be a Duration config
> >> with the expected catch up time after rescaling (let's say 5 minutes). It
> >> could be computed based on the current data rate and the calculated max
> >> processing rate after the rescale.
> >>
> >> Great. I am looking forward to the formula :)
> >
> >
> >> 3: In the current proposal we don't have per operator configs. Target
> >> utilization would apply to all operators uniformly.
> >>
> >> 4: It should be configurable, yes.
> >>
> >> 5,6: The names haven't been finalized but I think these are minor
> >> details. We could add concrete names to the FLIP :)
> >>
> >> Sounds good.
> >
> >
> >> Cheers,
> >> Gyula
> >>
> >>
> >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote:
> >>
> >>> Hi Max,
> >>>
> >>> Thank you for the proposal. The proposal tackles a very important issue
> >>> for Flink users and the design looks promising overall!
> >>>
> >>> I have some questions to better understand the proposed public
> >>> interfaces and the algorithm.
> >>>
> >>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> >>> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
> >>> Could you confirm that this can also be true for io-bound operators such 
> >>> as
> >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O 
> >>> bottleneck
> >>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> >>> reach 1 sec?
> >>>
> >>> 2) It is said that "users can configure a maximum time to fully process
> >>> the backlog". The configuration section does not seem to provide this
> >>> config. Could you specify this? And any chance this proposal can provide
> >>> the formula for calculating the new processing rate?
> >>>
> >>> 3) How are users expected to specify the per-operator configs (e.g.
> >>> target utilization)? For example, should users specify it programmatically
> >>> in a DataStream/Table/SQL API?
> >>>
> >>> 4) How often will the Flink Kubernetes operator query metrics from
> >>> JobManager? Is this configurable?
> >>>
> >>> 5) Could you specify the config name and default value for the proposed
> >>> configs?
> >>>
> >>> 6) Could you add the name/mbean/type for the proposed metrics?
> >>>
> >>>
> >>> Cheers,
> >>> Dong
> >>>
> >>>
> >>>

Reply via email to