Thanks for all the interest here and for the great remarks! Gyula already did a great job addressing the questions here. Let me try to add additional context:
@Biao Geng: >1. For source parallelisms, if the user configure a much larger value than >normal, there should be very little pending records though it is possible to >get optimized. But IIUC, in current algorithm, we will not take actions for >this case as the backlog growth rate is almost zero. Is the understanding >right? This is actually a corner case which we haven't exactly described in the FLIP yet. Sources are assumed to only be scaled according to the backlog but if there is zero backlog, we don't have a number to compute the parallelism. In this case we tune the source based on the utilization, just like we do for the other vertices. That could mean reducing the parallelism in case the source is not doing any work. Now, in case there is no backlog, we need to be careful that we don't bounce back to a higher parallelism afterwards. >2. Compared with “scaling out”, “scaling in” is usually more dangerous as it >is more likely to lead to negative influence to the downstream jobs. The >min/max load bounds should be useful. I am wondering if it is possible to have >different strategy for “scaling in” to make it more conservative. Or more >eagerly, allow custom autoscaling strategy(e.g. time-based strategy). Gyula already mentioned the bounded scale-down. Additionally, we could add more conservative utilization targets for scale down. For example, if we targeted 60% utilization for scale-up, we might target 30% utilization for scale-down, essentially reducing the parallelism slower. Same as with the limited parallelism scale-down, in the worst case this will require multiple scale downs. Ideally, the metrics should be reliable enough such that we do not require such workarounds. @JunRui Lee: >In the document, I didn't find the definition of when to trigger autoScaling >after some jobVertex reach the threshold. If I missed is, please let me know. The triggering is supposed to work based on the number of metric reports to aggregate and the cool down time. Additionally, there are boundaries for the target rates such that we don't scale on tiny deviations of the rates. I agree that we want to prevent unnecessary scalings as much as possible. We'll expand on that. @Pedro Silva: >Have you considered making metrics collection getting triggered based on >events rather than periodic checks? Ideally we want to continuously monitor the job to be able to find bottlenecks. Based on the metrics, we will decide whether to scale or not. However, if we find that the continuous monitoring is too costly, we might do it based on signals. Also, if there is some key-turn event that we must refresh our metrics for, that could also be interesting. A sudden spike in the backlog could warrant that. > Could the FLIP also be used to auto-scale based on state-level metrics at an > operator level? It could but we don't want to modify the JobGraph which means we are bound to using task-level parallelism. Setting operator level parallelism would mean rebuilding the JobGraph which is a tricky thing to do. It would increase the solution space but also the complexity of finding a stable scaling configuration. @Zheng: >After the user opens (advcie), it does not actually perform AutoScaling. It >only outputs the notification form of tuning suggestions for the user's >reference. That's a great idea. Such a "dry run" feature would give users a better sense of how the autoscaler would work. >I found that FFA 2020 Netflix has a related topic discussing the automatic >tuning function Thanks! They actually use a similar idea with respect to backlog-based scaling. Where their approach differs is that they scale the entire job based on a target load instead of scaling vertices individually. They have some interesting ideas like the lookup table for past scalings and the load prediction based on regressions. I have intentionally left out those optimizations but I think they can be useful if implemented well. >Can we provide some interfaces for users to customize and implement some >tuning algorithms? Certainly, I think it is critical that we provide a default implementation that works well for all kinds of Flink jobs. But users should have the option to plug in their own implementation. This will especially be useful for new sources which might not have the backlog information available as some of the built-in ones like Kafka. @Dong: >For example, suppose a Kafka Sink subtask has reached I/O bottleneck when flushing data out to the Kafka clusters, will busyTimeMsPerSecond reach 1 sec? The busyTimeMsPerSecond metric captures all the work of the task excluding backpressure time. So a sink's IO work would be included, unless it is non-blocking. Typically sinks write to external systems which can itself backpressure, and should to avoid the sink to be overwhelmed with data. This can be problematic in case the external system is the bottleneck and a parallelism increase does not increase the write rate. However, Gyula has previously thought about this exact case and proposed to stop scaling those tasks by measuring the effectiveness of the scaling decision and "giving up" for that vertex in case we don't yield the expected result. >The configuration section does not seem to provide this config. Could you >specify this? Thanks! I've added it. >How are users expected to specify the per-operator configs? That would only be possible when they create their job like you would normally write a Flink job. As soon as the autoscaler takes over, they won't be able to change parallelisms. However, it would be conceivable that we allow manually fixing some of the operator parallelisms but that should be a last resort and not required. >How often will the Flink Kubernetes operator query metrics from JobManager? Is >this configurable? This will be configurable. I think the JobManager only aggregates metrics every 5 seconds. I think something like every 30 seconds makes sense. >5) Could you specify the config name and default value for the proposed >configs? 6) Could you add the name/mbean/type for the proposed metrics? We will add this soon but feel free to suggest in the wiki page if you have ideas. @Yanfei: >It's said that "The first step is collecting metrics for all JobVertices by >combining metrics from all the runtime subtasks and computing the *average*". >When the load of the subtasks of an operator is not balanced, do we need to >trigger autoScaling? Has the median or some percentiles been considered? It hasn't but the suggestion is very welcome! There are pros and cons for every aggregation method. The median is certainly something to look at because, in contrast to the average, it would prevent hot partitions from triggering a large scale up. > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal, will we > reuse some logic from Reactive Mode? I wanted to mention it but I left it out intentionally to avoid confusion. I think the approach in the reactive mode is very different from what we are proposing here. Reactive mode is really not about autoscaling but about reacting to node changes in the cluster. We specifically want to solve the business logic of autoscaling instead of focusing on the execution part. For the first version, we plan to do a full redeploy which job vertex parallelism overrides. However, we have previously discussed with Chesnay that we could use Flink's old /rescaling API which would allow a more graceful scaling of the existing cluster. That would certainly be desirable. -Max On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > @Dong: > > Looking at the busyTime metrics in the TaskOMetricGroup it seems that busy > time is actually defined as "not idle or (soft) backpressured" . So I think > it would give us the correct reading based on what you said about the Kafka > sink. > > In any case we have to test this and if something is not as we expect we > should fix the metric. > > Gyula > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> wrote: > > > Thanks for the explanation Gyula. Please see my reply inline. > > > > BTW, has the proposed solution been deployed and evaluated with any > > production workload? If yes, I am wondering if you could share the > > experience, e.g. what is the likelihood of having regression and > > improvement respectively after enabling this feature. > > > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > >> Hi Dong! > >> > >> Let me try to answer the questions :) > >> > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time > >> spent in the main record processing loop for an operator if I > >> understand correctly. This includes IO operations too. > >> > > > > I took a look at the StreamTask::processInput(...) > > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576>. > > My understanding of this code is that when KafkaSink (same for other sinks) > > can not write data out to the network fast enough, recordWriter > > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575> > > becomes unavailable and the StreamTask is considered to be > > soft-back-pressured, instead of being considered busy. > > > > If this is the case, then the algorithm currently proposed in the FLIP > > might under-provision the sink operator's parallelism when the sink > > operator is the bottleneck. I am not an expert with this piece of code > > though. I am wondering if you or someone else could double-check this. > > > > > >> 2: We should add this to the FLIP I agree. It would be a Duration config > >> with the expected catch up time after rescaling (let's say 5 minutes). It > >> could be computed based on the current data rate and the calculated max > >> processing rate after the rescale. > >> > >> Great. I am looking forward to the formula :) > > > > > >> 3: In the current proposal we don't have per operator configs. Target > >> utilization would apply to all operators uniformly. > >> > >> 4: It should be configurable, yes. > >> > >> 5,6: The names haven't been finalized but I think these are minor > >> details. We could add concrete names to the FLIP :) > >> > >> Sounds good. > > > > > >> Cheers, > >> Gyula > >> > >> > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> wrote: > >> > >>> Hi Max, > >>> > >>> Thank you for the proposal. The proposal tackles a very important issue > >>> for Flink users and the design looks promising overall! > >>> > >>> I have some questions to better understand the proposed public > >>> interfaces and the algorithm. > >>> > >>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond > >>> could reach 1 sec. I believe this is mostly true for cpu-bound operators. > >>> Could you confirm that this can also be true for io-bound operators such > >>> as > >>> sinks? For example, suppose a Kafka Sink subtask has reached I/O > >>> bottleneck > >>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond > >>> reach 1 sec? > >>> > >>> 2) It is said that "users can configure a maximum time to fully process > >>> the backlog". The configuration section does not seem to provide this > >>> config. Could you specify this? And any chance this proposal can provide > >>> the formula for calculating the new processing rate? > >>> > >>> 3) How are users expected to specify the per-operator configs (e.g. > >>> target utilization)? For example, should users specify it programmatically > >>> in a DataStream/Table/SQL API? > >>> > >>> 4) How often will the Flink Kubernetes operator query metrics from > >>> JobManager? Is this configurable? > >>> > >>> 5) Could you specify the config name and default value for the proposed > >>> configs? > >>> > >>> 6) Could you add the name/mbean/type for the proposed metrics? > >>> > >>> > >>> Cheers, > >>> Dong > >>> > >>> > >>>