> They will struggle if they add new resources and nothing happens for 5
minutes.

The same applies if they start playing with FLIP-291 APIs. I'm wondering if
the cooldown makes sense there since it was the user's deliberate choice to
push new requirements. 🤔

Best,
D.

On Tue, Jul 4, 2023 at 9:11 AM David Morávek <d...@apache.org> wrote:

> The FLIP reads sane to me. I'm unsure about the default values, though; 5
> minutes of wait time between rescales feels rather strict, and we should
> rethink it to provide a better out-of-the-box experience.
>
> I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
> if they add new resources and nothing happens for 5 minutes. I'd suggest
> defaulting to
> *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
> defaults to 10s).
>
> I'm still struggling to grasp max internal (force rescale). Ignoring 
> `AdaptiveScheduler#shouldRescale()`
> condition seems rather dangerous. Wouldn't a simple case where you add a
> new TM and remove it before the max interval is reached (so there is
> nothing to do) result in an unnecessary job restart?
>
> Best,
> D.
>
> On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot <echauc...@apache.org>
> wrote:
>
>> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
>> vote thread.
>>
>> Best
>>
>> Etienne
>>
>> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
>> > > we should schedule a check that will rescale if
>> > min-parallelism-increase is met. Then, what it the use of
>> > scaling-interval.max timeout in that context ?
>> >
>> > To force a rescale if min-parallelism-increase is not met (but we
>> > could still run above the current parallelism).
>> >
>> > min-parallelism-increase is a trade-off between the cost of rescaling
>> > vs the performance benefit of the parallelism increase. Over time the
>> > balance tips more and more in favor of the parallelism increase, hence
>> > we should eventually rescale anyway even if the minimum isn't met, or
>> > at least give users the option to do so.
>> >
>> > > I meant the opposite: not having only the cooldown but having only
>> > the stabilization time. I must have missed something because what I
>> > wonder is: if every rescale entails a restart of the pipeline and
>> > every restart entails passing in waiting for resources state, then why
>> > introduce a cooldown when there is already at each rescale a stable
>> > resource timeout ?
>> >
>> > It is technically correct that the stable resource timeout can be used
>> > to limit the number of rescale operations per interval, however during
>> > that time the job isn't running, in contrast to the cooldown.
>> >
>> > Having both just gives you a lot more flexibility.
>> > "I want at most 1 rescale operation per hour, and wait at most 1
>> > minute for resource to stabilize when a rescale happens".
>> > You can't express this with only one of the options.
>> >
>> > On 20/06/2023 14:41, Etienne Chauchot wrote:
>> >> Hi Chesnay,
>> >>
>> >> Thanks for your feedback. Comments inline
>> >>
>> >> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>> >>> 1) Options specific to the adaptive scheduler should start with
>> >>> "jobmanager.adaptive-scheduler".
>> >>
>> >>
>> >> ok
>> >>
>> >>
>> >>> 2)
>> >>> There isn't /really /a notion of a "scaling event". The scheduler is
>> >>> informed about new/lost slots and job failures, and reacts
>> >>> accordingly by maybe rescaling the job.
>> >>> (sure, you can think of these as events, but you can think of
>> >>> practically everything as events)
>> >>>
>> >>> There shouldn't be a queue for events. All the scheduler should have
>> >>> to know is that the next rescale check is scheduled for time T,
>> >>> which in practice boils down to a flag and a scheduled action that
>> >>> runs Executing#maybeRescale.
>> >>
>> >>
>> >> Makes total sense, its very simple like this. Thanks for the
>> >> precision and pointer. After the related FLIPs, I'll look at the code
>> >> now.
>> >>
>> >>
>> >>> With that in mind, we also have to look at how we keep this state
>> >>> around. Presumably it is scoped to the current state, such that the
>> >>> cooldown is reset if a job fails.
>> >>> Maybe we should add a separate ExecutingWithCooldown state; not sure
>> >>> yet.
>> >>
>> >>
>> >> Yes loosing cooldown state and cooldown reset upon failure is what I
>> >> suggested in point 3 in previous email. Not sure either for a new
>> >> state, I'll figure it out after experimenting with the code. I'll
>> >> update the FLIP then.
>> >>
>> >>
>> >>>
>> >>> It would be good to clarify whether this FLIP only attempts to cover
>> >>> scale up operations, or also scale downs in case of slot losses.
>> >>
>> >>
>> >> When there are slots loss, most of the time it is due to a TM loss so
>> >> there should be several slots lost at the same time but (hopefully)
>> >> only once. There should not be many scale downs in a row (but still
>> >> cascading failures can happen). I think, we should just protect
>> >> against having scale ups immediately following. For that, I think we
>> >> could just keep the current behavior of transitioning to Restarting
>> >> state and then back to Waiting for Resources state. This state will
>> >> protect us against scale ups immediately following failure/restart.
>> >>
>> >>
>> >>>
>> >>> We should also think about how it relates to the externalized
>> >>> declarative resource management. Should we always rescale
>> >>> immediately? Should we wait until the cooldown is over?
>> >>
>> >>
>> >> It relates to point 2, no ? we should rescale immediately only if
>> >> last rescale was done more than scaling-interval.min ago otherwise
>> >> schedule a rescale at last-rescale + scaling-interval.min time.
>> >>
>> >>
>> >>> Related to this, there's the min-parallelism-increase option, that
>> >>> if for example set to "2" restricts rescale operations to only occur
>> >>> if the parallelism increases by at least 2.
>> >>
>> >>
>> >> yes I saw that in the code
>> >>
>> >>
>> >>> Ideally however there would be a max timeout for this.
>> >>>
>> >>> As such we could maybe think about this a bit differently:
>> >>> Add 2 new options instead of 1:
>> >>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
>> >>> the scheduler will wait for the next effective rescale operations.
>> >>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
>> >>> the scheduler will wait for the next effective rescale operations.
>> >>
>> >>
>> >> At point 2, we said that when slots change (requirements change or
>> >> new slots available), if last rescale check (call to maybeRescale)
>> >> was done less than scaling-interval.min ago, we should schedule a
>> >> check that will rescale if min-parallelism-increase is met. Then,
>> >> what it the use of scaling-interval.max timeout in that context ?
>> >>
>> >>
>> >>>
>> >>> 3) It sounds fine that we lose the cooldown state, because imo we
>> >>> want to reset the cooldown anyway on job failures (because a job
>> >>> failure inherently implies a potential rescaling).
>> >>
>> >>
>> >> exactly.
>> >>
>> >>
>> >>>
>> >>> 4) The stabilization time isn't really redundant and serves a
>> >>> different use-case. The idea behind is that if a users adds multiple
>> >>> TMs at once then we don't want to rescale immediately at the first
>> >>> received slot. Without the stabilization time the cooldown would
>> >>> actually cause bad behavior here, because not only would we rescale
>> >>> immediately upon receiving the minimum required slots to scale up,
>> >>> but we also wouldn't use the remaining slots just because the
>> >>> cooldown says so.
>> >>
>> >>
>> >> I meant the opposite: not having only the cooldown but having only
>> >> the stabilization time. I must have missed something because what I
>> >> wonder is: if every rescale entails a restart of the pipeline and
>> >> every restart entails passing in waiting for resources state, then
>> >> why introduce a cooldown when there is already at each rescale a
>> >> stable resource timeout ?
>> >>
>> >>
>> >> Best
>> >>
>> >> Etienne
>> >>
>> >>
>> >>
>> >>>
>> >>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>> >>>> Hi Robert,
>> >>>>
>> >>>> Thanks for your feedback. I don't know the scheduler part well
>> >>>> enough yet and I'm taking this ticket as a learning workshop.
>> >>>>
>> >>>> Regarding your comments:
>> >>>>
>> >>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
>> >>>> configuration from the JobManagerOptions, and also to be consistent
>> >>>> with other parameters name, I'd suggest
>> >>>> /jobmanager.scheduler-scaling-cooldown-period/
>> >>>>
>> >>>> 2. I thought scaling events existed already and the scheduler
>> >>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
>> >>>> is in the Executing state and receives new slots") or in FLIP-138
>> >>>> (cf "Whenever new slots are available the SlotPool notifies the
>> >>>> Scheduler"). If it is not the case (it is the scheduler who asks
>> >>>> for slots), then there is no need for storing scaling requests
>> indeed.
>> >>>>
>> >>>> => I need a confirmation here
>> >>>>
>> >>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
>> >>>> state and the CoolDownTimer state. So, upon recovery, it would be
>> >>>> as if there was no ongoing coolDown period. So, a first re-scale
>> >>>> could happen right away and it will start a coolDown period. A
>> >>>> second re-scale would have to wait for the end of this period.
>> >>>>
>> >>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
>> >>>> AdaptiveScheduler passes again in the "waiting for resources" state
>> >>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
>> >>>> is kind of redundant with the resource-stabilization-timeout. I
>> >>>> guess it is not the case otherwise the FLINK-21883 ticket would not
>> >>>> have been created.
>> >>>>
>> >>>> => I need a confirmation here also.
>> >>>>
>> >>>>
>> >>>> Thanks for your views on point 2 and 4.
>> >>>>
>> >>>>
>> >>>> Best
>> >>>>
>> >>>> Etienne
>> >>>>
>> >>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>> >>>>> Thanks for the FLIP.
>> >>>>>
>> >>>>> Some comments:
>> >>>>> 1. Can you specify the full proposed configuration name? "
>> >>>>> scaling-cooldown-period" is probably not the full config name?
>> >>>>> 2. Why is the concept of scaling events and a scaling queue
>> >>>>> needed? If I
>> >>>>> remember correctly, the adaptive scheduler will just check how many
>> >>>>> TaskManagers are available and then adjust the execution graph
>> >>>>> accordingly.
>> >>>>> There's no need to store a number of scaling events. We just need to
>> >>>>> determine the time to trigger an adjustment of the execution graph.
>> >>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
>> >>>>> the state
>> >>>>> of the Adaptive Scheduler?). My proposal would be to just reset the
>> >>>>> cooldown period, so after recovery of a JobManager, we have to
>> >>>>> wait at
>> >>>>> least for the cooldown period until further scaling operations are
>> >>>>> done.
>> >>>>> 4. What's the relationship to the
>> >>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>> >>>>> configuration?
>> >>>>>
>> >>>>> Thanks a lot for working on this!
>> >>>>>
>> >>>>> Best,
>> >>>>> Robert
>> >>>>>
>> >>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>> >>>>> Chauchot<echauc...@apache.org>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>> >>>>>> operations that we discussed below PTAL.
>> >>>>>>
>> >>>>>> Best
>> >>>>>>
>> >>>>>> Etienne
>> >>>>>>
>> >>>>>>
>> >>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>> >>>>>>> Hi Yuxia,
>> >>>>>>>
>> >>>>>>> Thanks for your feedback. The number of potentially stacked
>> >>>>>>> operations
>> >>>>>>> depends on the configured length of the cooldown period.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> The proposition in the FLIP is to add a minimum delay between 2
>> >>>>>>> scaling
>> >>>>>>> operations. But, indeed, an optimization could be to still stack
>> >>>>>>> the
>> >>>>>>> operations (that arrive during a cooldown period) but maybe not
>> >>>>>>> take
>> >>>>>>> only the last operation but rather aggregate them in order to
>> >>>>>>> end up
>> >>>>>>> with a single aggregated operation when the cooldown period
>> >>>>>>> ends. For
>> >>>>>>> example, let's say 3 taskManagers come up and 1 comes down
>> >>>>>>> during the
>> >>>>>>> cooldown period, we could generate a single operation of scale
>> >>>>>>> up +2
>> >>>>>>> when the period ends.
>> >>>>>>>
>> >>>>>>> As a side note regarding your comment on "it'll take a long time
>> to
>> >>>>>>> finish all", please keep in mind that the reactive mode (at
>> >>>>>>> least for
>> >>>>>>> now) is only available for streaming pipeline which are in essence
>> >>>>>>> infinite processing.
>> >>>>>>>
>> >>>>>>> Another side note: when you mention "every taskManagers
>> >>>>>>> connecting",
>> >>>>>>> if you are referring to the start of the pipeline, please keep
>> >>>>>>> in mind
>> >>>>>>> that the adaptive scheduler has a "waiting for resources" timeout
>> >>>>>>> period before starting the pipeline in which all taskmanagers
>> >>>>>>> connect
>> >>>>>>> and the parallelism is decided.
>> >>>>>>>
>> >>>>>>> Best
>> >>>>>>>
>> >>>>>>> Etienne
>> >>>>>>>
>> >>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>> >>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>> >>>>>>>> mechanism of the cooldown timeout.
>> >>>>>>>>
>> >>>>>>>>  From the Proposed Changes part, if a scalling event is
>> >>>>>>>> received and
>> >>>>>>>> it falls during the cooldown period, it'll be stacked to be
>> >>>>>>>> executed
>> >>>>>>>> after the period ends. Also, from the description of
>> >>>>>>>> FLINK-21883[1],
>> >>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>> >>>>>>>> because TaskManagers are not all connecting at the same time.
>> >>>>>>>>
>> >>>>>>>> So, is it possible that every taskmanager connecting will
>> >>>>>>>> produce a
>> >>>>>>>> scalling event and it'll be stacked with many scale up event
>> which
>> >>>>>>>> causes it'll take a long time to finish all? Can we just take the
>> >>>>>>>> last one event?
>> >>>>>>>>
>> >>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>> >>>>>>>>
>> >>>>>>>> Best regards, Yuxia
>> >>>>>>>>
>> >>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>> >>>>>>>> Chauchot"<echauc...@apache.org>
>> >>>>>>>> 收件人:
>> >>>>>>>> "dev"<dev@flink.apache.org>, "Robert Metzger"<
>> metrob...@gmail.com>
>> >>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
>> >>>>>>>> FLIP-322
>> >>>>>>>> Cooldown
>> >>>>>>>> period for adaptive scheduler
>> >>>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>> >>>>>>>> introduces a
>> >>>>>>>> cooldown period for the adaptive scheduler.
>> >>>>>>>>
>> >>>>>>>> I'd like to get your feedback especially @Robert as you opened
>> the
>> >>>>>>>> related ticket and worked on the reactive mode a lot.
>> >>>>>>>>
>> >>>>>>>> [1]
>> >>>>>>>>
>> >>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>> >>>>>>
>> >>>>>>> Best
>> >>>>>>>> Etienne
>> >>>
>> >>>
>> >
>
>

Reply via email to