Thanks for the fruitful discussion and I am really excited to see that the auto-scaling really happens for
Flink Kubernetes operator. It will be a very important step to make the long-running Flink job more smoothly. I just have some immature ideas and want to share them here. # Resource Reservation Since the current auto-scaling needs to fully redeploy the application, it may fail to start due to lack of resources. I know the Kubernetes operator could rollback to the old spec, but we still waste a lot of time to make things worse. I hope the FLIP-250[1](support customized K8s scheduler) could help in this case. # Session cluster Does auto-scaling have a plan to support jobs which are running in a session cluster? It might be a different story since we could not use Flink config options to override the job vertex parallelisms. Given that the SessionJob is also a first-class citizen, we need to document the limitation if not support. # Horizontal scaling V.S. Vertical scaling IIUC, the current proposal does not mention vertical scaling. There might be a chance that the memory/cpu of TaskManager is not configured properly. And this will cause unnecessary multiple scaling executions. [1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal Best, Yang Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道: > Thanks for all the interest here and for the great remarks! Gyula > already did a great job addressing the questions here. Let me try to > add additional context: > > @Biao Geng: > > >1. For source parallelisms, if the user configure a much larger value > than normal, there should be very little pending records though it is > possible to get optimized. But IIUC, in current algorithm, we will not take > actions for this case as the backlog growth rate is almost zero. Is the > understanding right? > > This is actually a corner case which we haven't exactly described in > the FLIP yet. Sources are assumed to only be scaled according to the > backlog but if there is zero backlog, we don't have a number to > compute the parallelism. In this case we tune the source based on the > utilization, just like we do for the other vertices. That could mean > reducing the parallelism in case the source is not doing any work. > Now, in case there is no backlog, we need to be careful that we don't > bounce back to a higher parallelism afterwards. > > >2. Compared with “scaling out”, “scaling in” is usually more dangerous > as it is more likely to lead to negative influence to the downstream jobs. > The min/max load bounds should be useful. I am wondering if it is possible > to have different strategy for “scaling in” to make it more conservative. > Or more eagerly, allow custom autoscaling strategy(e.g. time-based > strategy). > > Gyula already mentioned the bounded scale-down. Additionally, we could > add more conservative utilization targets for scale down. For example, > if we targeted 60% utilization for scale-up, we might target 30% > utilization for scale-down, essentially reducing the parallelism > slower. Same as with the limited parallelism scale-down, in the worst > case this will require multiple scale downs. Ideally, the metrics > should be reliable enough such that we do not require such > workarounds. > > @JunRui Lee: > > >In the document, I didn't find the definition of when to trigger > autoScaling after some jobVertex reach the threshold. If I missed is, > please let me know. > > The triggering is supposed to work based on the number of metric > reports to aggregate and the cool down time. Additionally, there are > boundaries for the target rates such that we don't scale on tiny > deviations of the rates. I agree that we want to prevent unnecessary > scalings as much as possible. We'll expand on that. > > @Pedro Silva: > > >Have you considered making metrics collection getting triggered based on > events rather than periodic checks? > > Ideally we want to continuously monitor the job to be able to find > bottlenecks. Based on the metrics, we will decide whether to scale or > not. However, if we find that the continuous monitoring is too costly, > we might do it based on signals. Also, if there is some key-turn event > that we must refresh our metrics for, that could also be interesting. > A sudden spike in the backlog could warrant that. > > > Could the FLIP also be used to auto-scale based on state-level metrics > at an operator level? > > It could but we don't want to modify the JobGraph which means we are > bound to using task-level parallelism. Setting operator level > parallelism would mean rebuilding the JobGraph which is a tricky thing > to do. It would increase the solution space but also the complexity of > finding a stable scaling configuration. > > @Zheng: > > >After the user opens (advcie), it does not actually perform AutoScaling. > It only outputs the notification form of tuning suggestions for the user's > reference. > > That's a great idea. Such a "dry run" feature would give users a > better sense of how the autoscaler would work. > > >I found that FFA 2020 Netflix has a related topic discussing the > automatic tuning function > > Thanks! They actually use a similar idea with respect to backlog-based > scaling. Where their approach differs is that they scale the entire > job based on a target load instead of scaling vertices individually. > They have some interesting ideas like the lookup table for past > scalings and the load prediction based on regressions. I have > intentionally left out those optimizations but I think they can be > useful if implemented well. > > >Can we provide some interfaces for users to customize and implement some > tuning algorithms? > > Certainly, I think it is critical that we provide a default > implementation that works well for all kinds of Flink jobs. But users > should have the option to plug in their own implementation. This will > especially be useful for new sources which might not have the backlog > information available as some of the built-in ones like Kafka. > > @Dong: > > >For example, suppose a Kafka Sink subtask has reached I/O bottleneck > when flushing data out to the Kafka clusters, will busyTimeMsPerSecond > reach 1 sec? > > The busyTimeMsPerSecond metric captures all the work of the task > excluding backpressure time. So a sink's IO work would be included, > unless it is non-blocking. Typically sinks write to external systems > which can itself backpressure, and should to avoid the sink to be > overwhelmed with data. This can be problematic in case the external > system is the bottleneck and a parallelism increase does not increase > the write rate. However, Gyula has previously thought about this exact > case and proposed to stop scaling those tasks by measuring the > effectiveness of the scaling decision and "giving up" for that vertex > in case we don't yield the expected result. > > >The configuration section does not seem to provide this config. Could you > specify this? > > Thanks! I've added it. > > >How are users expected to specify the per-operator configs? > > That would only be possible when they create their job like you would > normally write a Flink job. As soon as the autoscaler takes over, they > won't be able to change parallelisms. However, it would be conceivable > that we allow manually fixing some of the operator parallelisms but > that should be a last resort and not required. > > >How often will the Flink Kubernetes operator query metrics from > JobManager? Is this configurable? > > This will be configurable. I think the JobManager only aggregates > metrics every 5 seconds. I think something like every 30 seconds makes > sense. > > >5) Could you specify the config name and default value for the proposed > configs? 6) Could you add the name/mbean/type for the proposed metrics? > > We will add this soon but feel free to suggest in the wiki page if you > have ideas. > > @Yanfei: > > >It's said that "The first step is collecting metrics for all JobVertices > by combining metrics from all the runtime subtasks and computing the > *average*". When the load of the subtasks of an operator is not balanced, > do we need to trigger autoScaling? Has the median or some percentiles been > considered? > > It hasn't but the suggestion is very welcome! There are pros and cons > for every aggregation method. The median is certainly something to > look at because, in contrast to the average, it would prevent hot > partitions from triggering a large scale up. > > > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal, > will we reuse some logic from Reactive Mode? > > I wanted to mention it but I left it out intentionally to avoid > confusion. I think the approach in the reactive mode is very different > from what we are proposing here. Reactive mode is really not about > autoscaling but about reacting to node changes in the cluster. We > specifically want to solve the business logic of autoscaling instead > of focusing on the execution part. For the first version, we plan to > do a full redeploy which job vertex parallelism overrides. However, we > have previously discussed with Chesnay that we could use Flink's old > /rescaling API which would allow a more graceful scaling of the > existing cluster. That would certainly be desirable. > > > -Max > > On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > @Dong: > > > > Looking at the busyTime metrics in the TaskOMetricGroup it seems that > busy > > time is actually defined as "not idle or (soft) backpressured" . So I > think > > it would give us the correct reading based on what you said about the > Kafka > > sink. > > > > In any case we have to test this and if something is not as we expect we > > should fix the metric. > > > > Gyula > > > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > Thanks for the explanation Gyula. Please see my reply inline. > > > > > > BTW, has the proposed solution been deployed and evaluated with any > > > production workload? If yes, I am wondering if you could share the > > > experience, e.g. what is the likelihood of having regression and > > > improvement respectively after enabling this feature. > > > > > > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com> > wrote: > > > > > >> Hi Dong! > > >> > > >> Let me try to answer the questions :) > > >> > > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time > > >> spent in the main record processing loop for an operator if I > > >> understand correctly. This includes IO operations too. > > >> > > > > > > I took a look at the StreamTask::processInput(...) > > > < > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576 > >. > > > My understanding of this code is that when KafkaSink (same for other > sinks) > > > can not write data out to the network fast enough, recordWriter > > > < > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575 > > > > > becomes unavailable and the StreamTask is considered to be > > > soft-back-pressured, instead of being considered busy. > > > > > > If this is the case, then the algorithm currently proposed in the FLIP > > > might under-provision the sink operator's parallelism when the sink > > > operator is the bottleneck. I am not an expert with this piece of code > > > though. I am wondering if you or someone else could double-check this. > > > > > > > > >> 2: We should add this to the FLIP I agree. It would be a Duration > config > > >> with the expected catch up time after rescaling (let's say 5 > minutes). It > > >> could be computed based on the current data rate and the calculated > max > > >> processing rate after the rescale. > > >> > > >> Great. I am looking forward to the formula :) > > > > > > > > >> 3: In the current proposal we don't have per operator configs. Target > > >> utilization would apply to all operators uniformly. > > >> > > >> 4: It should be configurable, yes. > > >> > > >> 5,6: The names haven't been finalized but I think these are minor > > >> details. We could add concrete names to the FLIP :) > > >> > > >> Sounds good. > > > > > > > > >> Cheers, > > >> Gyula > > >> > > >> > > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote: > > >> > > >>> Hi Max, > > >>> > > >>> Thank you for the proposal. The proposal tackles a very important > issue > > >>> for Flink users and the design looks promising overall! > > >>> > > >>> I have some questions to better understand the proposed public > > >>> interfaces and the algorithm. > > >>> > > >>> 1) The proposal seems to assume that the operator's > busyTimeMsPerSecond > > >>> could reach 1 sec. I believe this is mostly true for cpu-bound > operators. > > >>> Could you confirm that this can also be true for io-bound operators > such as > > >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O > bottleneck > > >>> when flushing data out to the Kafka clusters, will > busyTimeMsPerSecond > > >>> reach 1 sec? > > >>> > > >>> 2) It is said that "users can configure a maximum time to fully > process > > >>> the backlog". The configuration section does not seem to provide this > > >>> config. Could you specify this? And any chance this proposal can > provide > > >>> the formula for calculating the new processing rate? > > >>> > > >>> 3) How are users expected to specify the per-operator configs (e.g. > > >>> target utilization)? For example, should users specify it > programmatically > > >>> in a DataStream/Table/SQL API? > > >>> > > >>> 4) How often will the Flink Kubernetes operator query metrics from > > >>> JobManager? Is this configurable? > > >>> > > >>> 5) Could you specify the config name and default value for the > proposed > > >>> configs? > > >>> > > >>> 6) Could you add the name/mbean/type for the proposed metrics? > > >>> > > >>> > > >>> Cheers, > > >>> Dong > > >>> > > >>> > > >>> >