Hi all,

Based on the discussion, I added a new configuration:
*jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
We considered the following options for the default value:

   1. Use a separate default value, e.g., 60s.
   2. Fallback to
   *jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
   3. Use the value from
   *jobmanager.adaptive-scheduler.scaling-interval.max.*
   4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).

We decided against option 2) because, as discussed in the mailing list, the
value can be too low. Option 3 was also ruled out since it can be too high
or unset and *scaling-interval.ma <http://scaling-interval.ma>*x serves a
different use case (it works well with *parallelism-increase*). Option 4
was not chosen because it would affect existing jobs after migration. After
migrating to the new Flink version, rescaling would only happen if the
desired resources were available. However, rescaling happened with every
resource change before migration.

Therefore, I prefer a new default value: 60s.


Additionally, we reviewed the current set of parameters and think there is
a change to align the parameters along the functionality with the release
of 2.0. So, we propose to have these parameters:
*jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout *
*jobmanager.adaptive-scheduler.submission.resource-wait-timeout*

*jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
*jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
*jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
*jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*

Link to the updated FLIP doc.
<https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit>

Thanks a lot.

Regards,
Zdenek

On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison <zti...@confluent.io> wrote:

> Hi Gyula,
>
> Thank you for reviewing the document and providing feedback.
>
>    1. I agree that we need two separate parameters for stabilization
>    intervals in different states. I will update the FLIP document accordingly.
>    2. That's correct. We reached the same conclusion while prototyping
>    the implementation. I will add a new bullet point to the FLIP document.
>
> Thanks a lot.
>
> Regards,
> Zdenek
>
>
> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra <gyf...@apache.org> wrote:
>
>> Hi All!
>>
>> Thank you for the proposal, I think it will be great to simplify the
>> current rescaling flow to make it more digestible :)
>>
>> I have 2 comments:
>>
>> 1. Related to what Matthias already pointed out, I think in production
>> scenarios it may be a typical requirement to have a fairly short
>> stabilization interval for job startup (reduce downtime) but overall a
>> longer stabilization period for Executing jobs before rescaling to avoid
>> fluctuations and therefore reduce downtime. I think it would be very
>> important to have 2 configs for that, one could fall back to the other of
>> course if undefined.
>>
>> 2. The document mentions that the stabilization period for executing jobs
>> is measured from the first resource event. I feel that if after the
>> stabilization period we dont have sufficient resources we should
>> completely
>> reset this timer and start the timeout from 0 when the next event arrives.
>> This will be more in line with the concept of stabilization, otherwise if
>> you receive a batch of new resources you may not utilize it because as
>> soon
>> as you have sufficient we rescale immediately.
>>
>> Cheers,
>> Gyula
>>
>>
>>
>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison <zti...@confluent.io.invalid
>> >
>> wrote:
>>
>> > Thanks, Mathias, for your opinions.
>> >
>> > I see two scenarios where different values for starting and rescaling
>> would
>> > be appropriate:
>> >
>> > 1) Flink serverless providers may prefer the fastest possible job
>> startup
>> > time, which can also be achieved by setting a smaller value for the
>> > stabilization timeout, such as 1 second, in the WaitingForResources
>> state.
>> > Conversely, to ensure maximum job uptime, it would be prudent to
>> increase
>> > the stabilization period for rescaling to a higher value, such as 1
>> minute,
>> > to handle server/node maintenance effectively.
>> >
>> > 2) In Reactive mode, the stabilization period is set to 0 by default.
>> > Setting a different default value for the rescale state could enhance
>> job
>> > stability during node maintenance, especially since the parameter
>> > min-parallelism-increase is no longer applicable.
>> >
>> > Regards,
>> >
>> > Zdenek
>> >
>> > On Tue, Jul 16, 2024 at 5:49 PM Matthias Pohl <map...@apache.org>
>> wrote:
>> >
>> > > Thanks Zdenek for your proposal on aligning the resource control logic
>> > > within the AdaptiveScheduler and cleaning up the rescaling code.
>> > >
>> > > Consolidating the parameters and the code as part of the 2.0 release
>> > makes
>> > > sense in my opinion: The proposed change adds consistent behavior to
>> the
>> > > WaitingForResources and Executing states of the AdaptiveScheduler and
>> > irons
>> > > out some flaws of the current implementation. This should help users
>> get
>> > a
>> > > clearer picture of the resource control logic. Removing obsolete
>> rescale
>> > > waiting time if only sufficient resources are available is also a nice
>> > > improvement.
>> > >
>> > > The j.a.min-parallelism-increase [1] parameter became kind of obsolete
>> > with
>> > > the introduction of the rescale REST endpoint in FLIP-291 [2] as you
>> > > pointed out in the FLIP. So, deprecating it sounds reasonable.
>> > >
>> > > On the topic of replacing the j.a.scaling-interval.max parameter [3]
>> with
>> > > the j.a.resource-stabilization-timeout [4]: I'm in favor of reducing
>> the
>> > > complexity of the Flink configuration. Therefore, using one parameter
>> for
>> > > both (WaitingForResources and Executing state) to stabilize the
>> resources
>> > > sounds like a good idea.
>> > >
>> > > I'm wondering whether there are scenarios, where we would want to have
>> > > different stabilization timeouts for starting (WaitingForResources)
>> and
>> > > rescaling (Executing) a job. In that case, having two resource
>> > > stabilization parameters (one job starts and one for rescales) with
>> one
>> > > being the fallback for the other is a straight-forward solution.
>> > >
>> > > Just as a side note because it came up: Keep in mind that FLIP-461
>> still
>> > > allows for immediate rescaling on a change event if checkpointing is
>> > > disabled or j.a.max-delay-for-scale-trigger [5] is configured
>> > accordingly.
>> > >
>> > > Best,
>> > > Matthias
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-min-parallelism-increase
>> > > [2]
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
>> > > [3]
>> > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max
>> > > [4]
>> > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
>> > > [5]
>> > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-max-delay-for-scale-trigger
>> > >
>> > >
>> > >
>> > > On Tue, Jul 16, 2024 at 3:05 PM Zdenek Tison
>> <zti...@confluent.io.invalid
>> > >
>> > > wrote:
>> > >
>> > > > Hi, I'd like to move a discussion from Google Docs to the mailing
>> list
>> > so
>> > > > that it's visible to everyone.
>> > > >
>> > > > *Yuanfeng Hu* brought up two concerns:
>> > > >
>> > > > 1) Related to the resource-stabilization-timeout,he thinks 10s May
>> be
>> > too
>> > > > short. In a container environment, if the number of tm added by rest
>> > > > requests is greater than 1, the tm initialization time may be much
>> > longer
>> > > > than 10s.
>> > > >
>> > > > and
>> > > >
>> > > > 2) He proposed a little scenario:
>> > > > There is 1 slot in the entire cluster. At this time, my task is
>> running
>> > > at
>> > > > 1 parallelism (the required slot is also 1). Then I add a tm(1slot),
>> > > which
>> > > > will obviously trigger a change event, and it will become stable
>> after
>> > 10
>> > > > seconds. If I change the required resources to 3 through rest at
>> this
>> > > time,
>> > > > rescale will be triggered immediately. and runs at a parallelism of
>> 2,
>> > Is
>> > > > this the expected result, or do we expect that the Rescale will be
>> > > > triggered after adding another tm, because this exactly matches the
>> > > > required resources
>> > > >
>> > > > Thank you, *Yuanfeng Hu, *for opening the discussion.
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> ---------------------------------------------------------------------------------------
>> > > >
>> > > > 1) Regarding the stabilization period:
>> > > >
>> > > > I am unsure what you mean by the part, 'if the number of tm added by
>> > rest
>> > > > requests is greater than 1.' However, I understand that it can take
>> > some
>> > > > time to spawn additional containers/pods in a containerized
>> > environment.
>> > > On
>> > > > the other hand, if a user adds more TMs, for instance, by increasing
>> > the
>> > > > number of replicas in a Kubernetes deployment, these replicas should
>> > > appear
>> > > > with some delay but at a similar time, correct?
>> > > >
>> > > > It's worth mentioning that since  FLIP-461
>> > > > <
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler
>> > > > >,
>> > > > the
>> > > > rescale operation is synchronized with checkpoint events, so the
>> > rescale
>> > > > doesn't happen right after this timeout expires.
>> > > >
>> > > > If we believe it is necessary to have different values for the
>> > > > stabilization period in the Executing and WaitingForResources
>> states,
>> > > even
>> > > > though this increases configuration complexity slightly, we could
>> have
>> > > > separate parameters for these two states:
>> > > > jobmanager.adaptive-scheduler.resource-stabilization-timeout
>> > > > <
>> > > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
>> > > > >
>> > > >  and *jobmanager.adaptive-scheduler.scaling-stabilization-timeout
>> > > > *(replacing
>> > > > the jobmanager.adaptive-scheduler.scaling-interval.max
>> > > > <
>> > > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max
>> > > > >
>> > > > ).
>> > > >
>> > > >
>> > > > *2) *Regarding the proposed scenario:
>> > > >
>> > > > The same behavior occurs in the current Flink version when the
>> > > > `min-parallelism-increase` is set to its default value 1. In this
>> case,
>> > > the
>> > > > rescale operation is triggered immediately or aligned with the
>> > checkpoint
>> > > > event (specified in FLIP-461).
>> > > > So, I would say the behavior is expected.
>> > > > Additionally, users can configure the rescaling behavior. For
>> example,
>> > > if a
>> > > > user sets the lower bound parallelism to 2 and the upper bound to 3,
>> > the
>> > > > system will rescale after 10 seconds. Alternatively, if the user
>> sets
>> > the
>> > > > same value for the lower and upper bounds, the rescale operation
>> will
>> > > wait
>> > > > until all slots are available.
>> > > >
>> > > > Best Regrads,
>> > > > Zdenek Tison
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Jul 11, 2024 at 2:38 PM Zdenek Tison <zti...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Hello,
>> > > > >
>> > > > > Our team has been working on several improvements for
>> > > AdaptiveScheduler,
>> > > > > specifically focusing on aligning logic and timeouts in the
>> > > > > WaitingForResources and Executing states. We believe these
>> > enhancements
>> > > > > will improve the adaptive scheduler's robustness and
>> maintainability.
>> > > > >
>> > > > > For more detailed information, please refer to the FLIP document.
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit?usp=sharing
>> > > > >
>> > > > > Thanks,
>> > > > > Zdenek Tison
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to