Hi Chesnay,
Thanks for your feedback. Comments inline
Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
1) Options specific to the adaptive scheduler should start with
"jobmanager.adaptive-scheduler".
ok
2)
There isn't /really /a notion of a "scaling event". The scheduler is
informed about new/lost slots and job failures, and reacts accordingly
by maybe rescaling the job.
(sure, you can think of these as events, but you can think of
practically everything as events)
There shouldn't be a queue for events. All the scheduler should have
to know is that the next rescale check is scheduled for time T, which
in practice boils down to a flag and a scheduled action that runs
Executing#maybeRescale.
Makes total sense, its very simple like this. Thanks for the precision
and pointer. After the related FLIPs, I'll look at the code now.
With that in mind, we also have to look at how we keep this state
around. Presumably it is scoped to the current state, such that the
cooldown is reset if a job fails.
Maybe we should add a separate ExecutingWithCooldown state; not sure yet.
Yes loosing cooldown state and cooldown reset upon failure is what I
suggested in point 3 in previous email. Not sure either for a new state,
I'll figure it out after experimenting with the code. I'll update the
FLIP then.
It would be good to clarify whether this FLIP only attempts to cover
scale up operations, or also scale downs in case of slot losses.
When there are slots loss, most of the time it is due to a TM loss so
there should be several slots lost at the same time but (hopefully) only
once. There should not be many scale downs in a row (but still cascading
failures can happen). I think, we should just protect against having
scale ups immediately following. For that, I think we could just keep
the current behavior of transitioning to Restarting state and then back
to Waiting for Resources state. This state will protect us against scale
ups immediately following failure/restart.
We should also think about how it relates to the externalized
declarative resource management. Should we always rescale immediately?
Should we wait until the cooldown is over?
It relates to point 2, no ? we should rescale immediately only if last
rescale was done more than scaling-interval.min ago otherwise schedule a
rescale at last-rescale + scaling-interval.min time.
Related to this, there's the min-parallelism-increase option, that if
for example set to "2" restricts rescale operations to only occur if
the parallelism increases by at least 2.
yes I saw that in the code
Ideally however there would be a max timeout for this.
As such we could maybe think about this a bit differently:
Add 2 new options instead of 1:
jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
the scheduler will wait for the next effective rescale operations.
jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
the scheduler will wait for the next effective rescale operations.
At point 2, we said that when slots change (requirements change or new
slots available), if last rescale check (call to maybeRescale) was done
less than scaling-interval.min ago, we should schedule a check that will
rescale if min-parallelism-increase is met. Then, what it the use of
scaling-interval.max timeout in that context ?
3) It sounds fine that we lose the cooldown state, because imo we want
to reset the cooldown anyway on job failures (because a job failure
inherently implies a potential rescaling).
exactly.
4) The stabilization time isn't really redundant and serves a
different use-case. The idea behind is that if a users adds multiple
TMs at once then we don't want to rescale immediately at the first
received slot. Without the stabilization time the cooldown would
actually cause bad behavior here, because not only would we rescale
immediately upon receiving the minimum required slots to scale up, but
we also wouldn't use the remaining slots just because the cooldown
says so.
I meant the opposite: not having only the cooldown but having only the
stabilization time. I must have missed something because what I wonder
is: if every rescale entails a restart of the pipeline and every restart
entails passing in waiting for resources state, then why introduce a
cooldown when there is already at each rescale a stable resource timeout ?
Best
Etienne
On 16/06/2023 15:47, Etienne Chauchot wrote:
Hi Robert,
Thanks for your feedback. I don't know the scheduler part well enough
yet and I'm taking this ticket as a learning workshop.
Regarding your comments:
1. Taking a look at the AdaptiveScheduler class which takes all its
configuration from the JobManagerOptions, and also to be consistent
with other parameters name, I'd suggest
/jobmanager.scheduler-scaling-cooldown-period/
2. I thought scaling events existed already and the scheduler
received them as mentioned in FLIP-160 (cf "Whenever the scheduler is
in the Executing state and receives new slots") or in FLIP-138 (cf
"Whenever new slots are available the SlotPool notifies the
Scheduler"). If it is not the case (it is the scheduler who asks for
slots), then there is no need for storing scaling requests indeed.
=> I need a confirmation here
3. If we loose the JobManager, we loose both the AdaptiveScheduler
state and the CoolDownTimer state. So, upon recovery, it would be as
if there was no ongoing coolDown period. So, a first re-scale could
happen right away and it will start a coolDown period. A second
re-scale would have to wait for the end of this period.
4. When a pipeline is re-scaled, it is restarted. Upon restart, the
AdaptiveScheduler passes again in the "waiting for resources" state
as FLIP-160 suggests. If so, then it seems that the coolDown period
is kind of redundant with the resource-stabilization-timeout. I guess
it is not the case otherwise the FLINK-21883 ticket would not have
been created.
=> I need a confirmation here also.
Thanks for your views on point 2 and 4.
Best
Etienne
Le 15/06/2023 à 13:35, Robert Metzger a écrit :
Thanks for the FLIP.
Some comments:
1. Can you specify the full proposed configuration name? "
scaling-cooldown-period" is probably not the full config name?
2. Why is the concept of scaling events and a scaling queue needed?
If I
remember correctly, the adaptive scheduler will just check how many
TaskManagers are available and then adjust the execution graph
accordingly.
There's no need to store a number of scaling events. We just need to
determine the time to trigger an adjustment of the execution graph.
3. What's the behavior wrt to JobManager failures (e.g. we lose the
state
of the Adaptive Scheduler?). My proposal would be to just reset the
cooldown period, so after recovery of a JobManager, we have to wait at
least for the cooldown period until further scaling operations are
done.
4. What's the relationship to the
"jobmanager.adaptive-scheduler.resource-stabilization-timeout"
configuration?
Thanks a lot for working on this!
Best,
Robert
On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot<echauc...@apache.org>
wrote:
Hi all,
@Yukia,I updated the FLIP to include the aggregation of the staked
operations that we discussed below PTAL.
Best
Etienne
Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
Hi Yuxia,
Thanks for your feedback. The number of potentially stacked
operations
depends on the configured length of the cooldown period.
The proposition in the FLIP is to add a minimum delay between 2
scaling
operations. But, indeed, an optimization could be to still stack the
operations (that arrive during a cooldown period) but maybe not take
only the last operation but rather aggregate them in order to end up
with a single aggregated operation when the cooldown period ends. For
example, let's say 3 taskManagers come up and 1 comes down during the
cooldown period, we could generate a single operation of scale up +2
when the period ends.
As a side note regarding your comment on "it'll take a long time to
finish all", please keep in mind that the reactive mode (at least for
now) is only available for streaming pipeline which are in essence
infinite processing.
Another side note: when you mention "every taskManagers connecting",
if you are referring to the start of the pipeline, please keep in
mind
that the adaptive scheduler has a "waiting for resources" timeout
period before starting the pipeline in which all taskmanagers connect
and the parallelism is decided.
Best
Etienne
Le 13/06/2023 à 03:58, yuxia a écrit :
Hi, Etienne. Thanks for driving it. I have one question about the
mechanism of the cooldown timeout.
From the Proposed Changes part, if a scalling event is received and
it falls during the cooldown period, it'll be stacked to be executed
after the period ends. Also, from the description of FLINK-21883[1],
cooldown timeout is to avoid rescaling the job very frequently,
because TaskManagers are not all connecting at the same time.
So, is it possible that every taskmanager connecting will produce a
scalling event and it'll be stacked with many scale up event which
causes it'll take a long time to finish all? Can we just take the
last one event?
[1]:https://issues.apache.org/jira/browse/FLINK-21883
Best regards, Yuxia
----- 原始邮件 ----- 发件人: "Etienne Chauchot"<echauc...@apache.org>
收件人:
"dev"<dev@flink.apache.org>, "Robert Metzger"<metrob...@gmail.com>
发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
FLIP-322
Cooldown
period for adaptive scheduler
Hi,
I’d like to start a discussion about FLIP-322 [1] which introduces a
cooldown period for the adaptive scheduler.
I'd like to get your feedback especially @Robert as you opened the
related ticket and worked on the reactive mode a lot.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
Best
Etienne