> They will struggle if they add new resources and nothing happens for 5 minutes.
The same applies if they start playing with FLIP-291 APIs. I'm wondering if the cooldown makes sense there since it was the user's deliberate choice to push new requirements. 🤔 Best, D. On Tue, Jul 4, 2023 at 9:11 AM David Morávek <d...@apache.org> wrote: > The FLIP reads sane to me. I'm unsure about the default values, though; 5 > minutes of wait time between rescales feels rather strict, and we should > rethink it to provide a better out-of-the-box experience. > > I'd focus on newcomers trying AS / Reactive Mode out. They will struggle > if they add new resources and nothing happens for 5 minutes. I'd suggest > defaulting to > *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which > defaults to 10s). > > I'm still struggling to grasp max internal (force rescale). Ignoring > `AdaptiveScheduler#shouldRescale()` > condition seems rather dangerous. Wouldn't a simple case where you add a > new TM and remove it before the max interval is reached (so there is > nothing to do) result in an unnecessary job restart? > > Best, > D. > > On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot <echauc...@apache.org> > wrote: > >> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a >> vote thread. >> >> Best >> >> Etienne >> >> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit : >> > > we should schedule a check that will rescale if >> > min-parallelism-increase is met. Then, what it the use of >> > scaling-interval.max timeout in that context ? >> > >> > To force a rescale if min-parallelism-increase is not met (but we >> > could still run above the current parallelism). >> > >> > min-parallelism-increase is a trade-off between the cost of rescaling >> > vs the performance benefit of the parallelism increase. Over time the >> > balance tips more and more in favor of the parallelism increase, hence >> > we should eventually rescale anyway even if the minimum isn't met, or >> > at least give users the option to do so. >> > >> > > I meant the opposite: not having only the cooldown but having only >> > the stabilization time. I must have missed something because what I >> > wonder is: if every rescale entails a restart of the pipeline and >> > every restart entails passing in waiting for resources state, then why >> > introduce a cooldown when there is already at each rescale a stable >> > resource timeout ? >> > >> > It is technically correct that the stable resource timeout can be used >> > to limit the number of rescale operations per interval, however during >> > that time the job isn't running, in contrast to the cooldown. >> > >> > Having both just gives you a lot more flexibility. >> > "I want at most 1 rescale operation per hour, and wait at most 1 >> > minute for resource to stabilize when a rescale happens". >> > You can't express this with only one of the options. >> > >> > On 20/06/2023 14:41, Etienne Chauchot wrote: >> >> Hi Chesnay, >> >> >> >> Thanks for your feedback. Comments inline >> >> >> >> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit : >> >>> 1) Options specific to the adaptive scheduler should start with >> >>> "jobmanager.adaptive-scheduler". >> >> >> >> >> >> ok >> >> >> >> >> >>> 2) >> >>> There isn't /really /a notion of a "scaling event". The scheduler is >> >>> informed about new/lost slots and job failures, and reacts >> >>> accordingly by maybe rescaling the job. >> >>> (sure, you can think of these as events, but you can think of >> >>> practically everything as events) >> >>> >> >>> There shouldn't be a queue for events. All the scheduler should have >> >>> to know is that the next rescale check is scheduled for time T, >> >>> which in practice boils down to a flag and a scheduled action that >> >>> runs Executing#maybeRescale. >> >> >> >> >> >> Makes total sense, its very simple like this. Thanks for the >> >> precision and pointer. After the related FLIPs, I'll look at the code >> >> now. >> >> >> >> >> >>> With that in mind, we also have to look at how we keep this state >> >>> around. Presumably it is scoped to the current state, such that the >> >>> cooldown is reset if a job fails. >> >>> Maybe we should add a separate ExecutingWithCooldown state; not sure >> >>> yet. >> >> >> >> >> >> Yes loosing cooldown state and cooldown reset upon failure is what I >> >> suggested in point 3 in previous email. Not sure either for a new >> >> state, I'll figure it out after experimenting with the code. I'll >> >> update the FLIP then. >> >> >> >> >> >>> >> >>> It would be good to clarify whether this FLIP only attempts to cover >> >>> scale up operations, or also scale downs in case of slot losses. >> >> >> >> >> >> When there are slots loss, most of the time it is due to a TM loss so >> >> there should be several slots lost at the same time but (hopefully) >> >> only once. There should not be many scale downs in a row (but still >> >> cascading failures can happen). I think, we should just protect >> >> against having scale ups immediately following. For that, I think we >> >> could just keep the current behavior of transitioning to Restarting >> >> state and then back to Waiting for Resources state. This state will >> >> protect us against scale ups immediately following failure/restart. >> >> >> >> >> >>> >> >>> We should also think about how it relates to the externalized >> >>> declarative resource management. Should we always rescale >> >>> immediately? Should we wait until the cooldown is over? >> >> >> >> >> >> It relates to point 2, no ? we should rescale immediately only if >> >> last rescale was done more than scaling-interval.min ago otherwise >> >> schedule a rescale at last-rescale + scaling-interval.min time. >> >> >> >> >> >>> Related to this, there's the min-parallelism-increase option, that >> >>> if for example set to "2" restricts rescale operations to only occur >> >>> if the parallelism increases by at least 2. >> >> >> >> >> >> yes I saw that in the code >> >> >> >> >> >>> Ideally however there would be a max timeout for this. >> >>> >> >>> As such we could maybe think about this a bit differently: >> >>> Add 2 new options instead of 1: >> >>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time >> >>> the scheduler will wait for the next effective rescale operations. >> >>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time >> >>> the scheduler will wait for the next effective rescale operations. >> >> >> >> >> >> At point 2, we said that when slots change (requirements change or >> >> new slots available), if last rescale check (call to maybeRescale) >> >> was done less than scaling-interval.min ago, we should schedule a >> >> check that will rescale if min-parallelism-increase is met. Then, >> >> what it the use of scaling-interval.max timeout in that context ? >> >> >> >> >> >>> >> >>> 3) It sounds fine that we lose the cooldown state, because imo we >> >>> want to reset the cooldown anyway on job failures (because a job >> >>> failure inherently implies a potential rescaling). >> >> >> >> >> >> exactly. >> >> >> >> >> >>> >> >>> 4) The stabilization time isn't really redundant and serves a >> >>> different use-case. The idea behind is that if a users adds multiple >> >>> TMs at once then we don't want to rescale immediately at the first >> >>> received slot. Without the stabilization time the cooldown would >> >>> actually cause bad behavior here, because not only would we rescale >> >>> immediately upon receiving the minimum required slots to scale up, >> >>> but we also wouldn't use the remaining slots just because the >> >>> cooldown says so. >> >> >> >> >> >> I meant the opposite: not having only the cooldown but having only >> >> the stabilization time. I must have missed something because what I >> >> wonder is: if every rescale entails a restart of the pipeline and >> >> every restart entails passing in waiting for resources state, then >> >> why introduce a cooldown when there is already at each rescale a >> >> stable resource timeout ? >> >> >> >> >> >> Best >> >> >> >> Etienne >> >> >> >> >> >> >> >>> >> >>> On 16/06/2023 15:47, Etienne Chauchot wrote: >> >>>> Hi Robert, >> >>>> >> >>>> Thanks for your feedback. I don't know the scheduler part well >> >>>> enough yet and I'm taking this ticket as a learning workshop. >> >>>> >> >>>> Regarding your comments: >> >>>> >> >>>> 1. Taking a look at the AdaptiveScheduler class which takes all its >> >>>> configuration from the JobManagerOptions, and also to be consistent >> >>>> with other parameters name, I'd suggest >> >>>> /jobmanager.scheduler-scaling-cooldown-period/ >> >>>> >> >>>> 2. I thought scaling events existed already and the scheduler >> >>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler >> >>>> is in the Executing state and receives new slots") or in FLIP-138 >> >>>> (cf "Whenever new slots are available the SlotPool notifies the >> >>>> Scheduler"). If it is not the case (it is the scheduler who asks >> >>>> for slots), then there is no need for storing scaling requests >> indeed. >> >>>> >> >>>> => I need a confirmation here >> >>>> >> >>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler >> >>>> state and the CoolDownTimer state. So, upon recovery, it would be >> >>>> as if there was no ongoing coolDown period. So, a first re-scale >> >>>> could happen right away and it will start a coolDown period. A >> >>>> second re-scale would have to wait for the end of this period. >> >>>> >> >>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the >> >>>> AdaptiveScheduler passes again in the "waiting for resources" state >> >>>> as FLIP-160 suggests. If so, then it seems that the coolDown period >> >>>> is kind of redundant with the resource-stabilization-timeout. I >> >>>> guess it is not the case otherwise the FLINK-21883 ticket would not >> >>>> have been created. >> >>>> >> >>>> => I need a confirmation here also. >> >>>> >> >>>> >> >>>> Thanks for your views on point 2 and 4. >> >>>> >> >>>> >> >>>> Best >> >>>> >> >>>> Etienne >> >>>> >> >>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit : >> >>>>> Thanks for the FLIP. >> >>>>> >> >>>>> Some comments: >> >>>>> 1. Can you specify the full proposed configuration name? " >> >>>>> scaling-cooldown-period" is probably not the full config name? >> >>>>> 2. Why is the concept of scaling events and a scaling queue >> >>>>> needed? If I >> >>>>> remember correctly, the adaptive scheduler will just check how many >> >>>>> TaskManagers are available and then adjust the execution graph >> >>>>> accordingly. >> >>>>> There's no need to store a number of scaling events. We just need to >> >>>>> determine the time to trigger an adjustment of the execution graph. >> >>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose >> >>>>> the state >> >>>>> of the Adaptive Scheduler?). My proposal would be to just reset the >> >>>>> cooldown period, so after recovery of a JobManager, we have to >> >>>>> wait at >> >>>>> least for the cooldown period until further scaling operations are >> >>>>> done. >> >>>>> 4. What's the relationship to the >> >>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout" >> >>>>> configuration? >> >>>>> >> >>>>> Thanks a lot for working on this! >> >>>>> >> >>>>> Best, >> >>>>> Robert >> >>>>> >> >>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne >> >>>>> Chauchot<echauc...@apache.org> >> >>>>> wrote: >> >>>>> >> >>>>>> Hi all, >> >>>>>> >> >>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked >> >>>>>> operations that we discussed below PTAL. >> >>>>>> >> >>>>>> Best >> >>>>>> >> >>>>>> Etienne >> >>>>>> >> >>>>>> >> >>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit : >> >>>>>>> Hi Yuxia, >> >>>>>>> >> >>>>>>> Thanks for your feedback. The number of potentially stacked >> >>>>>>> operations >> >>>>>>> depends on the configured length of the cooldown period. >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> The proposition in the FLIP is to add a minimum delay between 2 >> >>>>>>> scaling >> >>>>>>> operations. But, indeed, an optimization could be to still stack >> >>>>>>> the >> >>>>>>> operations (that arrive during a cooldown period) but maybe not >> >>>>>>> take >> >>>>>>> only the last operation but rather aggregate them in order to >> >>>>>>> end up >> >>>>>>> with a single aggregated operation when the cooldown period >> >>>>>>> ends. For >> >>>>>>> example, let's say 3 taskManagers come up and 1 comes down >> >>>>>>> during the >> >>>>>>> cooldown period, we could generate a single operation of scale >> >>>>>>> up +2 >> >>>>>>> when the period ends. >> >>>>>>> >> >>>>>>> As a side note regarding your comment on "it'll take a long time >> to >> >>>>>>> finish all", please keep in mind that the reactive mode (at >> >>>>>>> least for >> >>>>>>> now) is only available for streaming pipeline which are in essence >> >>>>>>> infinite processing. >> >>>>>>> >> >>>>>>> Another side note: when you mention "every taskManagers >> >>>>>>> connecting", >> >>>>>>> if you are referring to the start of the pipeline, please keep >> >>>>>>> in mind >> >>>>>>> that the adaptive scheduler has a "waiting for resources" timeout >> >>>>>>> period before starting the pipeline in which all taskmanagers >> >>>>>>> connect >> >>>>>>> and the parallelism is decided. >> >>>>>>> >> >>>>>>> Best >> >>>>>>> >> >>>>>>> Etienne >> >>>>>>> >> >>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit : >> >>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the >> >>>>>>>> mechanism of the cooldown timeout. >> >>>>>>>> >> >>>>>>>> From the Proposed Changes part, if a scalling event is >> >>>>>>>> received and >> >>>>>>>> it falls during the cooldown period, it'll be stacked to be >> >>>>>>>> executed >> >>>>>>>> after the period ends. Also, from the description of >> >>>>>>>> FLINK-21883[1], >> >>>>>>>> cooldown timeout is to avoid rescaling the job very frequently, >> >>>>>>>> because TaskManagers are not all connecting at the same time. >> >>>>>>>> >> >>>>>>>> So, is it possible that every taskmanager connecting will >> >>>>>>>> produce a >> >>>>>>>> scalling event and it'll be stacked with many scale up event >> which >> >>>>>>>> causes it'll take a long time to finish all? Can we just take the >> >>>>>>>> last one event? >> >>>>>>>> >> >>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883 >> >>>>>>>> >> >>>>>>>> Best regards, Yuxia >> >>>>>>>> >> >>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne >> >>>>>>>> Chauchot"<echauc...@apache.org> >> >>>>>>>> 收件人: >> >>>>>>>> "dev"<dev@flink.apache.org>, "Robert Metzger"< >> metrob...@gmail.com> >> >>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] >> >>>>>>>> FLIP-322 >> >>>>>>>> Cooldown >> >>>>>>>> period for adaptive scheduler >> >>>>>>>> >> >>>>>>>> Hi, >> >>>>>>>> >> >>>>>>>> I’d like to start a discussion about FLIP-322 [1] which >> >>>>>>>> introduces a >> >>>>>>>> cooldown period for the adaptive scheduler. >> >>>>>>>> >> >>>>>>>> I'd like to get your feedback especially @Robert as you opened >> the >> >>>>>>>> related ticket and worked on the reactive mode a lot. >> >>>>>>>> >> >>>>>>>> [1] >> >>>>>>>> >> >>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler >> >>>>>> >> >>>>>>> Best >> >>>>>>>> Etienne >> >>> >> >>> >> > > >