+1 If there are no further comments, I'll start a vote thread in the next few days.
-Max On Tue, Nov 15, 2022 at 2:06 PM Zheng Yu Chen <jam.gz...@gmail.com> wrote: > @Gyula Have a good news, now flip-256 now is finish and merge it . > flip-271 discussion seems to have stopped and I wonder if there are any > other comments. Can we get to the polls and start this exciting feature 😀 > Maybe I can get involved in developing this feature > > > > Gyula Fóra <gyula.f...@gmail.com> 于2022年11月8日周二 18:46写道: > > > I had 2 extra comments to Max's reply: > > > > 1. About pre-allocating resources: > > This could be done through the operator when the standalone deployment > mode > > is used relatively easily as there we have better control of > > pods/resources. > > > > 2. Session jobs: > > There is a FLIP ( > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api > > ) > > to support passing configuration when we submit jobs to the session > cluster > > through the rest api. Once that goes through, session jobs can also be > > scaled in a similar way through the configuration. > > > > Cheers, > > Gyula > > > > > > On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels <m...@apache.org> > wrote: > > > > > @Yang > > > > > > >Since the current auto-scaling needs to fully redeploy the > application, > > it > > > may fail to start due to lack of resources. > > > > > > Great suggestions. I agree that we will have to have to preallocate / > > > reserve resources to ensure the rescaling doesn't take longer as > > expected. > > > This is not only a problem when scaling up but also when scaling down > > > because any pods surrendered might be taken over by another deployment > > > during the rescaling. This would certainly be a case for integrating > > > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the > > > rescaling API. Alternatively, the operator would have to reserve the > > > resources somehow. > > > > > > >Does auto-scaling have a plan to support jobs which are running in a > > > session cluster? It might be a different > > > > > > We are targeting the application deployment mode for the first version > > but > > > the standalone mode can be supported as soon as we have an integration > > with > > > the scheduler. > > > > > > > # Horizontal scaling V.S. Vertical scaling > > > > > > True. We left out vertical scaling intentionally. For now we assume > CPU / > > > memory is set up by the user. While definitely useful, vertical scaling > > > adds another dimension to the scaling problem which we wanted to tackle > > > later. I'll update the FLIP to explicitly state that. > > > > > > -Max > > > > > > > > > > > > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang <danrtsey...@gmail.com> > wrote: > > > > > > > Thanks for the fruitful discussion and I am really excited to see > that > > > the > > > > auto-scaling really happens for > > > > > > > > Flink Kubernetes operator. It will be a very important step to make > the > > > > long-running Flink job more smoothly. > > > > > > > > I just have some immature ideas and want to share them here. > > > > > > > > > > > > # Resource Reservation > > > > > > > > Since the current auto-scaling needs to fully redeploy the > application, > > > it > > > > may fail to start due to lack of resources. > > > > > > > > I know the Kubernetes operator could rollback to the old spec, but we > > > still > > > > waste a lot of time to make things worse. > > > > > > > > I hope the FLIP-250[1](support customized K8s scheduler) could help > in > > > this > > > > case. > > > > > > > > > > > > # Session cluster > > > > > > > > Does auto-scaling have a plan to support jobs which are running in a > > > > session cluster? It might be a different > > > > > > > > story since we could not use Flink config options to override the > job > > > > vertex parallelisms. Given that the SessionJob > > > > > > > > is also a first-class citizen, we need to document the limitation if > > not > > > > support. > > > > > > > > > > > > # Horizontal scaling V.S. Vertical scaling > > > > > > > > IIUC, the current proposal does not mention vertical scaling. There > > might > > > > be a chance that the memory/cpu of > > > > > > > > TaskManager is not configured properly. And this will cause > unnecessary > > > > multiple scaling executions. > > > > > > > > > > > > > > > > > > > > [1]. > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal > > > > > > > > > > > > > > > > Best, > > > > > > > > Yang > > > > > > > > Maximilian Michels <m...@apache.org> 于2022年11月8日周二 00:31写道: > > > > > > > > > Thanks for all the interest here and for the great remarks! Gyula > > > > > already did a great job addressing the questions here. Let me try > to > > > > > add additional context: > > > > > > > > > > @Biao Geng: > > > > > > > > > > >1. For source parallelisms, if the user configure a much larger > > value > > > > > than normal, there should be very little pending records though it > is > > > > > possible to get optimized. But IIUC, in current algorithm, we will > > not > > > > take > > > > > actions for this case as the backlog growth rate is almost zero. Is > > the > > > > > understanding right? > > > > > > > > > > This is actually a corner case which we haven't exactly described > in > > > > > the FLIP yet. Sources are assumed to only be scaled according to > the > > > > > backlog but if there is zero backlog, we don't have a number to > > > > > compute the parallelism. In this case we tune the source based on > the > > > > > utilization, just like we do for the other vertices. That could > mean > > > > > reducing the parallelism in case the source is not doing any work. > > > > > Now, in case there is no backlog, we need to be careful that we > don't > > > > > bounce back to a higher parallelism afterwards. > > > > > > > > > > >2. Compared with “scaling out”, “scaling in” is usually more > > > dangerous > > > > > as it is more likely to lead to negative influence to the > downstream > > > > jobs. > > > > > The min/max load bounds should be useful. I am wondering if it is > > > > possible > > > > > to have different strategy for “scaling in” to make it more > > > conservative. > > > > > Or more eagerly, allow custom autoscaling strategy(e.g. time-based > > > > > strategy). > > > > > > > > > > Gyula already mentioned the bounded scale-down. Additionally, we > > could > > > > > add more conservative utilization targets for scale down. For > > example, > > > > > if we targeted 60% utilization for scale-up, we might target 30% > > > > > utilization for scale-down, essentially reducing the parallelism > > > > > slower. Same as with the limited parallelism scale-down, in the > worst > > > > > case this will require multiple scale downs. Ideally, the metrics > > > > > should be reliable enough such that we do not require such > > > > > workarounds. > > > > > > > > > > @JunRui Lee: > > > > > > > > > > >In the document, I didn't find the definition of when to trigger > > > > > autoScaling after some jobVertex reach the threshold. If I missed > is, > > > > > please let me know. > > > > > > > > > > The triggering is supposed to work based on the number of metric > > > > > reports to aggregate and the cool down time. Additionally, there > are > > > > > boundaries for the target rates such that we don't scale on tiny > > > > > deviations of the rates. I agree that we want to prevent > unnecessary > > > > > scalings as much as possible. We'll expand on that. > > > > > > > > > > @Pedro Silva: > > > > > > > > > > >Have you considered making metrics collection getting triggered > > based > > > on > > > > > events rather than periodic checks? > > > > > > > > > > Ideally we want to continuously monitor the job to be able to find > > > > > bottlenecks. Based on the metrics, we will decide whether to scale > or > > > > > not. However, if we find that the continuous monitoring is too > > costly, > > > > > we might do it based on signals. Also, if there is some key-turn > > event > > > > > that we must refresh our metrics for, that could also be > interesting. > > > > > A sudden spike in the backlog could warrant that. > > > > > > > > > > > Could the FLIP also be used to auto-scale based on state-level > > > metrics > > > > > at an operator level? > > > > > > > > > > It could but we don't want to modify the JobGraph which means we > are > > > > > bound to using task-level parallelism. Setting operator level > > > > > parallelism would mean rebuilding the JobGraph which is a tricky > > thing > > > > > to do. It would increase the solution space but also the complexity > > of > > > > > finding a stable scaling configuration. > > > > > > > > > > @Zheng: > > > > > > > > > > >After the user opens (advcie), it does not actually perform > > > AutoScaling. > > > > > It only outputs the notification form of tuning suggestions for the > > > > user's > > > > > reference. > > > > > > > > > > That's a great idea. Such a "dry run" feature would give users a > > > > > better sense of how the autoscaler would work. > > > > > > > > > > >I found that FFA 2020 Netflix has a related topic discussing the > > > > > automatic tuning function > > > > > > > > > > Thanks! They actually use a similar idea with respect to > > backlog-based > > > > > scaling. Where their approach differs is that they scale the entire > > > > > job based on a target load instead of scaling vertices > individually. > > > > > They have some interesting ideas like the lookup table for past > > > > > scalings and the load prediction based on regressions. I have > > > > > intentionally left out those optimizations but I think they can be > > > > > useful if implemented well. > > > > > > > > > > >Can we provide some interfaces for users to customize and > implement > > > some > > > > > tuning algorithms? > > > > > > > > > > Certainly, I think it is critical that we provide a default > > > > > implementation that works well for all kinds of Flink jobs. But > users > > > > > should have the option to plug in their own implementation. This > will > > > > > especially be useful for new sources which might not have the > backlog > > > > > information available as some of the built-in ones like Kafka. > > > > > > > > > > @Dong: > > > > > > > > > > >For example, suppose a Kafka Sink subtask has reached I/O > bottleneck > > > > > when flushing data out to the Kafka clusters, will > > busyTimeMsPerSecond > > > > > reach 1 sec? > > > > > > > > > > The busyTimeMsPerSecond metric captures all the work of the task > > > > > excluding backpressure time. So a sink's IO work would be included, > > > > > unless it is non-blocking. Typically sinks write to external > systems > > > > > which can itself backpressure, and should to avoid the sink to be > > > > > overwhelmed with data. This can be problematic in case the external > > > > > system is the bottleneck and a parallelism increase does not > increase > > > > > the write rate. However, Gyula has previously thought about this > > exact > > > > > case and proposed to stop scaling those tasks by measuring the > > > > > effectiveness of the scaling decision and "giving up" for that > vertex > > > > > in case we don't yield the expected result. > > > > > > > > > > >The configuration section does not seem to provide this config. > > Could > > > > you > > > > > specify this? > > > > > > > > > > Thanks! I've added it. > > > > > > > > > > >How are users expected to specify the per-operator configs? > > > > > > > > > > That would only be possible when they create their job like you > would > > > > > normally write a Flink job. As soon as the autoscaler takes over, > > they > > > > > won't be able to change parallelisms. However, it would be > > conceivable > > > > > that we allow manually fixing some of the operator parallelisms but > > > > > that should be a last resort and not required. > > > > > > > > > > >How often will the Flink Kubernetes operator query metrics from > > > > > JobManager? Is this configurable? > > > > > > > > > > This will be configurable. I think the JobManager only aggregates > > > > > metrics every 5 seconds. I think something like every 30 seconds > > makes > > > > > sense. > > > > > > > > > > >5) Could you specify the config name and default value for the > > > proposed > > > > > configs? 6) Could you add the name/mbean/type for the proposed > > metrics? > > > > > > > > > > We will add this soon but feel free to suggest in the wiki page if > > you > > > > > have ideas. > > > > > > > > > > @Yanfei: > > > > > > > > > > >It's said that "The first step is collecting metrics for all > > > JobVertices > > > > > by combining metrics from all the runtime subtasks and computing > the > > > > > *average*". When the load of the subtasks of an operator is not > > > balanced, > > > > > do we need to trigger autoScaling? Has the median or some > percentiles > > > > been > > > > > considered? > > > > > > > > > > It hasn't but the suggestion is very welcome! There are pros and > cons > > > > > for every aggregation method. The median is certainly something to > > > > > look at because, in contrast to the average, it would prevent hot > > > > > partitions from triggering a large scale up. > > > > > > > > > > > IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this > > proposal, > > > > > will we reuse some logic from Reactive Mode? > > > > > > > > > > I wanted to mention it but I left it out intentionally to avoid > > > > > confusion. I think the approach in the reactive mode is very > > different > > > > > from what we are proposing here. Reactive mode is really not about > > > > > autoscaling but about reacting to node changes in the cluster. We > > > > > specifically want to solve the business logic of autoscaling > instead > > > > > of focusing on the execution part. For the first version, we plan > to > > > > > do a full redeploy which job vertex parallelism overrides. However, > > we > > > > > have previously discussed with Chesnay that we could use Flink's > old > > > > > /rescaling API which would allow a more graceful scaling of the > > > > > existing cluster. That would certainly be desirable. > > > > > > > > > > > > > > > -Max > > > > > > > > > > On Mon, Nov 7, 2022 at 5:08 PM Gyula Fóra <gyula.f...@gmail.com> > > > wrote: > > > > > > > > > > > > @Dong: > > > > > > > > > > > > Looking at the busyTime metrics in the TaskOMetricGroup it seems > > that > > > > > busy > > > > > > time is actually defined as "not idle or (soft) backpressured" . > > So I > > > > > think > > > > > > it would give us the correct reading based on what you said about > > the > > > > > Kafka > > > > > > sink. > > > > > > > > > > > > In any case we have to test this and if something is not as we > > expect > > > > we > > > > > > should fix the metric. > > > > > > > > > > > > Gyula > > > > > > > > > > > > On Mon, Nov 7, 2022 at 5:00 PM Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > > > Thanks for the explanation Gyula. Please see my reply inline. > > > > > > > > > > > > > > BTW, has the proposed solution been deployed and evaluated with > > any > > > > > > > production workload? If yes, I am wondering if you could share > > the > > > > > > > experience, e.g. what is the likelihood of having regression > and > > > > > > > improvement respectively after enabling this feature. > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra < > gyula.f...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > >> Hi Dong! > > > > > > >> > > > > > > >> Let me try to answer the questions :) > > > > > > >> > > > > > > >> 1 : busyTimeMsPerSecond is not specific for CPU, it measures > the > > > > time > > > > > > >> spent in the main record processing loop for an operator if I > > > > > > >> understand correctly. This includes IO operations too. > > > > > > >> > > > > > > > > > > > > > > I took a look at the StreamTask::processInput(...) > > > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L576 > > > > > >. > > > > > > > My understanding of this code is that when KafkaSink (same for > > > other > > > > > sinks) > > > > > > > can not write data out to the network fast enough, recordWriter > > > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L575 > > > > > > > > > > > > > becomes unavailable and the StreamTask is considered to be > > > > > > > soft-back-pressured, instead of being considered busy. > > > > > > > > > > > > > > If this is the case, then the algorithm currently proposed in > the > > > > FLIP > > > > > > > might under-provision the sink operator's parallelism when the > > sink > > > > > > > operator is the bottleneck. I am not an expert with this piece > of > > > > code > > > > > > > though. I am wondering if you or someone else could > double-check > > > > this. > > > > > > > > > > > > > > > > > > > > >> 2: We should add this to the FLIP I agree. It would be a > > Duration > > > > > config > > > > > > >> with the expected catch up time after rescaling (let's say 5 > > > > > minutes). It > > > > > > >> could be computed based on the current data rate and the > > > calculated > > > > > max > > > > > > >> processing rate after the rescale. > > > > > > >> > > > > > > >> Great. I am looking forward to the formula :) > > > > > > > > > > > > > > > > > > > > >> 3: In the current proposal we don't have per operator configs. > > > > Target > > > > > > >> utilization would apply to all operators uniformly. > > > > > > >> > > > > > > >> 4: It should be configurable, yes. > > > > > > >> > > > > > > >> 5,6: The names haven't been finalized but I think these are > > minor > > > > > > >> details. We could add concrete names to the FLIP :) > > > > > > >> > > > > > > >> Sounds good. > > > > > > > > > > > > > > > > > > > > >> Cheers, > > > > > > >> Gyula > > > > > > >> > > > > > > >> > > > > > > >> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > >> > > > > > > >>> Hi Max, > > > > > > >>> > > > > > > >>> Thank you for the proposal. The proposal tackles a very > > important > > > > > issue > > > > > > >>> for Flink users and the design looks promising overall! > > > > > > >>> > > > > > > >>> I have some questions to better understand the proposed > public > > > > > > >>> interfaces and the algorithm. > > > > > > >>> > > > > > > >>> 1) The proposal seems to assume that the operator's > > > > > busyTimeMsPerSecond > > > > > > >>> could reach 1 sec. I believe this is mostly true for > cpu-bound > > > > > operators. > > > > > > >>> Could you confirm that this can also be true for io-bound > > > operators > > > > > such as > > > > > > >>> sinks? For example, suppose a Kafka Sink subtask has reached > > I/O > > > > > bottleneck > > > > > > >>> when flushing data out to the Kafka clusters, will > > > > > busyTimeMsPerSecond > > > > > > >>> reach 1 sec? > > > > > > >>> > > > > > > >>> 2) It is said that "users can configure a maximum time to > fully > > > > > process > > > > > > >>> the backlog". The configuration section does not seem to > > provide > > > > this > > > > > > >>> config. Could you specify this? And any chance this proposal > > can > > > > > provide > > > > > > >>> the formula for calculating the new processing rate? > > > > > > >>> > > > > > > >>> 3) How are users expected to specify the per-operator configs > > > (e.g. > > > > > > >>> target utilization)? For example, should users specify it > > > > > programmatically > > > > > > >>> in a DataStream/Table/SQL API? > > > > > > >>> > > > > > > >>> 4) How often will the Flink Kubernetes operator query metrics > > > from > > > > > > >>> JobManager? Is this configurable? > > > > > > >>> > > > > > > >>> 5) Could you specify the config name and default value for > the > > > > > proposed > > > > > > >>> configs? > > > > > > >>> > > > > > > >>> 6) Could you add the name/mbean/type for the proposed > metrics? > > > > > > >>> > > > > > > >>> > > > > > > >>> Cheers, > > > > > > >>> Dong > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > > > -- > Best > > ConradJam >