> They will struggle if they add new resources and nothing happens for
5 minutes.
The same applies if they start playing with FLIP-291 APIs. I'm
wondering if the cooldown makes sense there since it was the user's
deliberate choice to push new requirements. 🤔
Best,
D.
On Tue, Jul 4, 2023 at 9:11 AM David Morávek <d...@apache.org> wrote:
The FLIP reads sane to me. I'm unsure about the default values,
though; 5 minutes of wait time between rescales feels rather
strict, and we should rethink it to provide a better
out-of-the-box experience.
I'd focus on newcomers trying AS / Reactive Mode out. They will
struggle if they add new resources and nothing happens for 5
minutes. I'd suggest defaulting to
/jobmanager.adaptive-scheduler.resource-stabilization-timeout/ (which
defaults to 10s).
I'm still struggling to grasp max internal (force rescale).
Ignoring `AdaptiveScheduler#shouldRescale()` condition seems
rather dangerous. Wouldn't a simple case where you add a new TM
and remove it before the max interval is reached (so there is
nothing to do) result in an unnecessary job restart?
Best,
D.
On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot
<echauc...@apache.org> wrote:
Thanks Chesnay for your feedback. I have updated the FLIP.
I'll start a
vote thread.
Best
Etienne
Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
> > we should schedule a check that will rescale if
> min-parallelism-increase is met. Then, what it the use of
> scaling-interval.max timeout in that context ?
>
> To force a rescale if min-parallelism-increase is not met
(but we
> could still run above the current parallelism).
>
> min-parallelism-increase is a trade-off between the cost of
rescaling
> vs the performance benefit of the parallelism increase. Over
time the
> balance tips more and more in favor of the parallelism
increase, hence
> we should eventually rescale anyway even if the minimum
isn't met, or
> at least give users the option to do so.
>
> > I meant the opposite: not having only the cooldown but
having only
> the stabilization time. I must have missed something because
what I
> wonder is: if every rescale entails a restart of the
pipeline and
> every restart entails passing in waiting for resources
state, then why
> introduce a cooldown when there is already at each rescale a
stable
> resource timeout ?
>
> It is technically correct that the stable resource timeout
can be used
> to limit the number of rescale operations per interval,
however during
> that time the job isn't running, in contrast to the cooldown.
>
> Having both just gives you a lot more flexibility.
> "I want at most 1 rescale operation per hour, and wait at
most 1
> minute for resource to stabilize when a rescale happens".
> You can't express this with only one of the options.
>
> On 20/06/2023 14:41, Etienne Chauchot wrote:
>> Hi Chesnay,
>>
>> Thanks for your feedback. Comments inline
>>
>> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>>> 1) Options specific to the adaptive scheduler should start
with
>>> "jobmanager.adaptive-scheduler".
>>
>>
>> ok
>>
>>
>>> 2)
>>> There isn't /really /a notion of a "scaling event". The
scheduler is
>>> informed about new/lost slots and job failures, and reacts
>>> accordingly by maybe rescaling the job.
>>> (sure, you can think of these as events, but you can think of
>>> practically everything as events)
>>>
>>> There shouldn't be a queue for events. All the scheduler
should have
>>> to know is that the next rescale check is scheduled for
time T,
>>> which in practice boils down to a flag and a scheduled
action that
>>> runs Executing#maybeRescale.
>>
>>
>> Makes total sense, its very simple like this. Thanks for the
>> precision and pointer. After the related FLIPs, I'll look
at the code
>> now.
>>
>>
>>> With that in mind, we also have to look at how we keep
this state
>>> around. Presumably it is scoped to the current state, such
that the
>>> cooldown is reset if a job fails.
>>> Maybe we should add a separate ExecutingWithCooldown
state; not sure
>>> yet.
>>
>>
>> Yes loosing cooldown state and cooldown reset upon failure
is what I
>> suggested in point 3 in previous email. Not sure either for
a new
>> state, I'll figure it out after experimenting with the
code. I'll
>> update the FLIP then.
>>
>>
>>>
>>> It would be good to clarify whether this FLIP only
attempts to cover
>>> scale up operations, or also scale downs in case of slot
losses.
>>
>>
>> When there are slots loss, most of the time it is due to a
TM loss so
>> there should be several slots lost at the same time but
(hopefully)
>> only once. There should not be many scale downs in a row
(but still
>> cascading failures can happen). I think, we should just
protect
>> against having scale ups immediately following. For that, I
think we
>> could just keep the current behavior of transitioning to
Restarting
>> state and then back to Waiting for Resources state. This
state will
>> protect us against scale ups immediately following
failure/restart.
>>
>>
>>>
>>> We should also think about how it relates to the externalized
>>> declarative resource management. Should we always rescale
>>> immediately? Should we wait until the cooldown is over?
>>
>>
>> It relates to point 2, no ? we should rescale immediately
only if
>> last rescale was done more than scaling-interval.min ago
otherwise
>> schedule a rescale at last-rescale + scaling-interval.min time.
>>
>>
>>> Related to this, there's the min-parallelism-increase
option, that
>>> if for example set to "2" restricts rescale operations to
only occur
>>> if the parallelism increases by at least 2.
>>
>>
>> yes I saw that in the code
>>
>>
>>> Ideally however there would be a max timeout for this.
>>>
>>> As such we could maybe think about this a bit differently:
>>> Add 2 new options instead of 1:
>>> jobmanager.adaptive-scheduler.scaling-interval.min: The
minimum time
>>> the scheduler will wait for the next effective rescale
operations.
>>> jobmanager.adaptive-scheduler.scaling-interval.max: The
maximum time
>>> the scheduler will wait for the next effective rescale
operations.
>>
>>
>> At point 2, we said that when slots change (requirements
change or
>> new slots available), if last rescale check (call to
maybeRescale)
>> was done less than scaling-interval.min ago, we should
schedule a
>> check that will rescale if min-parallelism-increase is met.
Then,
>> what it the use of scaling-interval.max timeout in that
context ?
>>
>>
>>>
>>> 3) It sounds fine that we lose the cooldown state, because
imo we
>>> want to reset the cooldown anyway on job failures (because
a job
>>> failure inherently implies a potential rescaling).
>>
>>
>> exactly.
>>
>>
>>>
>>> 4) The stabilization time isn't really redundant and serves a
>>> different use-case. The idea behind is that if a users
adds multiple
>>> TMs at once then we don't want to rescale immediately at
the first
>>> received slot. Without the stabilization time the cooldown
would
>>> actually cause bad behavior here, because not only would
we rescale
>>> immediately upon receiving the minimum required slots to
scale up,
>>> but we also wouldn't use the remaining slots just because the
>>> cooldown says so.
>>
>>
>> I meant the opposite: not having only the cooldown but
having only
>> the stabilization time. I must have missed something
because what I
>> wonder is: if every rescale entails a restart of the
pipeline and
>> every restart entails passing in waiting for resources
state, then
>> why introduce a cooldown when there is already at each
rescale a
>> stable resource timeout ?
>>
>>
>> Best
>>
>> Etienne
>>
>>
>>
>>>
>>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>>>> Hi Robert,
>>>>
>>>> Thanks for your feedback. I don't know the scheduler part
well
>>>> enough yet and I'm taking this ticket as a learning workshop.
>>>>
>>>> Regarding your comments:
>>>>
>>>> 1. Taking a look at the AdaptiveScheduler class which
takes all its
>>>> configuration from the JobManagerOptions, and also to be
consistent
>>>> with other parameters name, I'd suggest
>>>> /jobmanager.scheduler-scaling-cooldown-period/
>>>>
>>>> 2. I thought scaling events existed already and the
scheduler
>>>> received them as mentioned in FLIP-160 (cf "Whenever the
scheduler
>>>> is in the Executing state and receives new slots") or in
FLIP-138
>>>> (cf "Whenever new slots are available the SlotPool
notifies the
>>>> Scheduler"). If it is not the case (it is the scheduler
who asks
>>>> for slots), then there is no need for storing scaling
requests indeed.
>>>>
>>>> => I need a confirmation here
>>>>
>>>> 3. If we loose the JobManager, we loose both the
AdaptiveScheduler
>>>> state and the CoolDownTimer state. So, upon recovery, it
would be
>>>> as if there was no ongoing coolDown period. So, a first
re-scale
>>>> could happen right away and it will start a coolDown
period. A
>>>> second re-scale would have to wait for the end of this
period.
>>>>
>>>> 4. When a pipeline is re-scaled, it is restarted. Upon
restart, the
>>>> AdaptiveScheduler passes again in the "waiting for
resources" state
>>>> as FLIP-160 suggests. If so, then it seems that the
coolDown period
>>>> is kind of redundant with the
resource-stabilization-timeout. I
>>>> guess it is not the case otherwise the FLINK-21883 ticket
would not
>>>> have been created.
>>>>
>>>> => I need a confirmation here also.
>>>>
>>>>
>>>> Thanks for your views on point 2 and 4.
>>>>
>>>>
>>>> Best
>>>>
>>>> Etienne
>>>>
>>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>>>> Thanks for the FLIP.
>>>>>
>>>>> Some comments:
>>>>> 1. Can you specify the full proposed configuration name? "
>>>>> scaling-cooldown-period" is probably not the full config
name?
>>>>> 2. Why is the concept of scaling events and a scaling queue
>>>>> needed? If I
>>>>> remember correctly, the adaptive scheduler will just
check how many
>>>>> TaskManagers are available and then adjust the execution
graph
>>>>> accordingly.
>>>>> There's no need to store a number of scaling events. We
just need to
>>>>> determine the time to trigger an adjustment of the
execution graph.
>>>>> 3. What's the behavior wrt to JobManager failures (e.g.
we lose
>>>>> the state
>>>>> of the Adaptive Scheduler?). My proposal would be to
just reset the
>>>>> cooldown period, so after recovery of a JobManager, we
have to
>>>>> wait at
>>>>> least for the cooldown period until further scaling
operations are
>>>>> done.
>>>>> 4. What's the relationship to the
>>>>>
"jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>>>> configuration?
>>>>>
>>>>> Thanks a lot for working on this!
>>>>>
>>>>> Best,
>>>>> Robert
>>>>>
>>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>>>>> Chauchot<echauc...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> @Yukia,I updated the FLIP to include the aggregation of
the staked
>>>>>> operations that we discussed below PTAL.
>>>>>>
>>>>>> Best
>>>>>>
>>>>>> Etienne
>>>>>>
>>>>>>
>>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>>>> Hi Yuxia,
>>>>>>>
>>>>>>> Thanks for your feedback. The number of potentially
stacked
>>>>>>> operations
>>>>>>> depends on the configured length of the cooldown period.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The proposition in the FLIP is to add a minimum delay
between 2
>>>>>>> scaling
>>>>>>> operations. But, indeed, an optimization could be to
still stack
>>>>>>> the
>>>>>>> operations (that arrive during a cooldown period) but
maybe not
>>>>>>> take
>>>>>>> only the last operation but rather aggregate them in
order to
>>>>>>> end up
>>>>>>> with a single aggregated operation when the cooldown
period
>>>>>>> ends. For
>>>>>>> example, let's say 3 taskManagers come up and 1 comes
down
>>>>>>> during the
>>>>>>> cooldown period, we could generate a single operation
of scale
>>>>>>> up +2
>>>>>>> when the period ends.
>>>>>>>
>>>>>>> As a side note regarding your comment on "it'll take a
long time to
>>>>>>> finish all", please keep in mind that the reactive
mode (at
>>>>>>> least for
>>>>>>> now) is only available for streaming pipeline which
are in essence
>>>>>>> infinite processing.
>>>>>>>
>>>>>>> Another side note: when you mention "every taskManagers
>>>>>>> connecting",
>>>>>>> if you are referring to the start of the pipeline,
please keep
>>>>>>> in mind
>>>>>>> that the adaptive scheduler has a "waiting for
resources" timeout
>>>>>>> period before starting the pipeline in which all
taskmanagers
>>>>>>> connect
>>>>>>> and the parallelism is decided.
>>>>>>>
>>>>>>> Best
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>>>> Hi, Etienne. Thanks for driving it. I have one
question about the
>>>>>>>> mechanism of the cooldown timeout.
>>>>>>>>
>>>>>>>> From the Proposed Changes part, if a scalling event is
>>>>>>>> received and
>>>>>>>> it falls during the cooldown period, it'll be stacked
to be
>>>>>>>> executed
>>>>>>>> after the period ends. Also, from the description of
>>>>>>>> FLINK-21883[1],
>>>>>>>> cooldown timeout is to avoid rescaling the job very
frequently,
>>>>>>>> because TaskManagers are not all connecting at the
same time.
>>>>>>>>
>>>>>>>> So, is it possible that every taskmanager connecting
will
>>>>>>>> produce a
>>>>>>>> scalling event and it'll be stacked with many scale
up event which
>>>>>>>> causes it'll take a long time to finish all? Can we
just take the
>>>>>>>> last one event?
>>>>>>>>
>>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>>>
>>>>>>>> Best regards, Yuxia
>>>>>>>>
>>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>>>>>>>> Chauchot"<echauc...@apache.org>
>>>>>>>> 收件人:
>>>>>>>> "dev"<dev@flink.apache.org>, "Robert
Metzger"<metrob...@gmail.com>
>>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题:
[DISCUSS]
>>>>>>>> FLIP-322
>>>>>>>> Cooldown
>>>>>>>> period for adaptive scheduler
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>>>>>>>> introduces a
>>>>>>>> cooldown period for the adaptive scheduler.
>>>>>>>>
>>>>>>>> I'd like to get your feedback especially @Robert as
you opened the
>>>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>>>>>>
>>>>>>> Best
>>>>>>>> Etienne
>>>
>>>
>