Hi,

If there are no further comments, I would propose starting a vote on these
changes. But first, I would like to ask a committer to migrate the draft to
an FLIP in the Flink Wiki.

Thanks a lot.

Kind Regards,

Zdenek

On Tue, Jul 30, 2024 at 10:36 AM Zdenek Tison <zti...@confluent.io> wrote:

> Hi all,
>
> Based on the discussion, I added a new configuration:
> *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
> We considered the following options for the default value:
>
>    1. Use a separate default value, e.g., 60s.
>    2. Fallback to
>    *jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
>    3. Use the value from
>    *jobmanager.adaptive-scheduler.scaling-interval.max.*
>    4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).
>
> We decided against option 2) because, as discussed in the mailing list,
> the value can be too low. Option 3 was also ruled out since it can be too
> high or unset and *scaling-interval.ma <http://scaling-interval.ma>*x
> serves a different use case (it works well with *parallelism-increase*).
> Option 4 was not chosen because it would affect existing jobs after
> migration. After migrating to the new Flink version, rescaling would only
> happen if the desired resources were available. However, rescaling happened
> with every resource change before migration.
>
> Therefore, I prefer a new default value: 60s.
>
>
> Additionally, we reviewed the current set of parameters and think there is
> a change to align the parameters along the functionality with the release
> of 2.0. So, we propose to have these parameters:
> *jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout *
> *jobmanager.adaptive-scheduler.submission.resource-wait-timeout*
>
> *jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
> *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
>
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*
>
> Link to the updated FLIP doc.
> <https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit>
>
> Thanks a lot.
>
> Regards,
> Zdenek
>
> On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison <zti...@confluent.io> wrote:
>
>> Hi Gyula,
>>
>> Thank you for reviewing the document and providing feedback.
>>
>>    1. I agree that we need two separate parameters for stabilization
>>    intervals in different states. I will update the FLIP document 
>> accordingly.
>>    2. That's correct. We reached the same conclusion while prototyping
>>    the implementation. I will add a new bullet point to the FLIP document.
>>
>> Thanks a lot.
>>
>> Regards,
>> Zdenek
>>
>>
>> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra <gyf...@apache.org> wrote:
>>
>>> Hi All!
>>>
>>> Thank you for the proposal, I think it will be great to simplify the
>>> current rescaling flow to make it more digestible :)
>>>
>>> I have 2 comments:
>>>
>>> 1. Related to what Matthias already pointed out, I think in production
>>> scenarios it may be a typical requirement to have a fairly short
>>> stabilization interval for job startup (reduce downtime) but overall a
>>> longer stabilization period for Executing jobs before rescaling to avoid
>>> fluctuations and therefore reduce downtime. I think it would be very
>>> important to have 2 configs for that, one could fall back to the other of
>>> course if undefined.
>>>
>>> 2. The document mentions that the stabilization period for executing jobs
>>> is measured from the first resource event. I feel that if after the
>>> stabilization period we dont have sufficient resources we should
>>> completely
>>> reset this timer and start the timeout from 0 when the next event
>>> arrives.
>>> This will be more in line with the concept of stabilization, otherwise if
>>> you receive a batch of new resources you may not utilize it because as
>>> soon
>>> as you have sufficient we rescale immediately.
>>>
>>> Cheers,
>>> Gyula
>>>
>>>
>>>
>>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison <zti...@confluent.io.invalid
>>> >
>>> wrote:
>>>
>>> > Thanks, Mathias, for your opinions.
>>> >
>>> > I see two scenarios where different values for starting and rescaling
>>> would
>>> > be appropriate:
>>> >
>>> > 1) Flink serverless providers may prefer the fastest possible job
>>> startup
>>> > time, which can also be achieved by setting a smaller value for the
>>> > stabilization timeout, such as 1 second, in the WaitingForResources
>>> state.
>>> > Conversely, to ensure maximum job uptime, it would be prudent to
>>> increase
>>> > the stabilization period for rescaling to a higher value, such as 1
>>> minute,
>>> > to handle server/node maintenance effectively.
>>> >
>>> > 2) In Reactive mode, the stabilization period is set to 0 by default.
>>> > Setting a different default value for the rescale state could enhance
>>> job
>>> > stability during node maintenance, especially since the parameter
>>> > min-parallelism-increase is no longer applicable.
>>> >
>>> > Regards,
>>> >
>>> > Zdenek
>>> >
>>> > On Tue, Jul 16, 2024 at 5:49 PM Matthias Pohl <map...@apache.org>
>>> wrote:
>>> >
>>> > > Thanks Zdenek for your proposal on aligning the resource control
>>> logic
>>> > > within the AdaptiveScheduler and cleaning up the rescaling code.
>>> > >
>>> > > Consolidating the parameters and the code as part of the 2.0 release
>>> > makes
>>> > > sense in my opinion: The proposed change adds consistent behavior to
>>> the
>>> > > WaitingForResources and Executing states of the AdaptiveScheduler and
>>> > irons
>>> > > out some flaws of the current implementation. This should help users
>>> get
>>> > a
>>> > > clearer picture of the resource control logic. Removing obsolete
>>> rescale
>>> > > waiting time if only sufficient resources are available is also a
>>> nice
>>> > > improvement.
>>> > >
>>> > > The j.a.min-parallelism-increase [1] parameter became kind of
>>> obsolete
>>> > with
>>> > > the introduction of the rescale REST endpoint in FLIP-291 [2] as you
>>> > > pointed out in the FLIP. So, deprecating it sounds reasonable.
>>> > >
>>> > > On the topic of replacing the j.a.scaling-interval.max parameter [3]
>>> with
>>> > > the j.a.resource-stabilization-timeout [4]: I'm in favor of reducing
>>> the
>>> > > complexity of the Flink configuration. Therefore, using one
>>> parameter for
>>> > > both (WaitingForResources and Executing state) to stabilize the
>>> resources
>>> > > sounds like a good idea.
>>> > >
>>> > > I'm wondering whether there are scenarios, where we would want to
>>> have
>>> > > different stabilization timeouts for starting (WaitingForResources)
>>> and
>>> > > rescaling (Executing) a job. In that case, having two resource
>>> > > stabilization parameters (one job starts and one for rescales) with
>>> one
>>> > > being the fallback for the other is a straight-forward solution.
>>> > >
>>> > > Just as a side note because it came up: Keep in mind that FLIP-461
>>> still
>>> > > allows for immediate rescaling on a change event if checkpointing is
>>> > > disabled or j.a.max-delay-for-scale-trigger [5] is configured
>>> > accordingly.
>>> > >
>>> > > Best,
>>> > > Matthias
>>> > >
>>> > > [1]
>>> > >
>>> > >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-min-parallelism-increase
>>> > > [2]
>>> > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
>>> > > [3]
>>> > >
>>> > >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max
>>> > > [4]
>>> > >
>>> > >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
>>> > > [5]
>>> > >
>>> > >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-max-delay-for-scale-trigger
>>> > >
>>> > >
>>> > >
>>> > > On Tue, Jul 16, 2024 at 3:05 PM Zdenek Tison
>>> <zti...@confluent.io.invalid
>>> > >
>>> > > wrote:
>>> > >
>>> > > > Hi, I'd like to move a discussion from Google Docs to the mailing
>>> list
>>> > so
>>> > > > that it's visible to everyone.
>>> > > >
>>> > > > *Yuanfeng Hu* brought up two concerns:
>>> > > >
>>> > > > 1) Related to the resource-stabilization-timeout,he thinks 10s May
>>> be
>>> > too
>>> > > > short. In a container environment, if the number of tm added by
>>> rest
>>> > > > requests is greater than 1, the tm initialization time may be much
>>> > longer
>>> > > > than 10s.
>>> > > >
>>> > > > and
>>> > > >
>>> > > > 2) He proposed a little scenario:
>>> > > > There is 1 slot in the entire cluster. At this time, my task is
>>> running
>>> > > at
>>> > > > 1 parallelism (the required slot is also 1). Then I add a
>>> tm(1slot),
>>> > > which
>>> > > > will obviously trigger a change event, and it will become stable
>>> after
>>> > 10
>>> > > > seconds. If I change the required resources to 3 through rest at
>>> this
>>> > > time,
>>> > > > rescale will be triggered immediately. and runs at a parallelism
>>> of 2,
>>> > Is
>>> > > > this the expected result, or do we expect that the Rescale will be
>>> > > > triggered after adding another tm, because this exactly matches the
>>> > > > required resources
>>> > > >
>>> > > > Thank you, *Yuanfeng Hu, *for opening the discussion.
>>> > > >
>>> > > >
>>> > > >
>>> > >
>>> >
>>> ---------------------------------------------------------------------------------------
>>> > > >
>>> > > > 1) Regarding the stabilization period:
>>> > > >
>>> > > > I am unsure what you mean by the part, 'if the number of tm added
>>> by
>>> > rest
>>> > > > requests is greater than 1.' However, I understand that it can take
>>> > some
>>> > > > time to spawn additional containers/pods in a containerized
>>> > environment.
>>> > > On
>>> > > > the other hand, if a user adds more TMs, for instance, by
>>> increasing
>>> > the
>>> > > > number of replicas in a Kubernetes deployment, these replicas
>>> should
>>> > > appear
>>> > > > with some delay but at a similar time, correct?
>>> > > >
>>> > > > It's worth mentioning that since  FLIP-461
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler
>>> > > > >,
>>> > > > the
>>> > > > rescale operation is synchronized with checkpoint events, so the
>>> > rescale
>>> > > > doesn't happen right after this timeout expires.
>>> > > >
>>> > > > If we believe it is necessary to have different values for the
>>> > > > stabilization period in the Executing and WaitingForResources
>>> states,
>>> > > even
>>> > > > though this increases configuration complexity slightly, we could
>>> have
>>> > > > separate parameters for these two states:
>>> > > > jobmanager.adaptive-scheduler.resource-stabilization-timeout
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
>>> > > > >
>>> > > >  and *jobmanager.adaptive-scheduler.scaling-stabilization-timeout
>>> > > > *(replacing
>>> > > > the jobmanager.adaptive-scheduler.scaling-interval.max
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max
>>> > > > >
>>> > > > ).
>>> > > >
>>> > > >
>>> > > > *2) *Regarding the proposed scenario:
>>> > > >
>>> > > > The same behavior occurs in the current Flink version when the
>>> > > > `min-parallelism-increase` is set to its default value 1. In this
>>> case,
>>> > > the
>>> > > > rescale operation is triggered immediately or aligned with the
>>> > checkpoint
>>> > > > event (specified in FLIP-461).
>>> > > > So, I would say the behavior is expected.
>>> > > > Additionally, users can configure the rescaling behavior. For
>>> example,
>>> > > if a
>>> > > > user sets the lower bound parallelism to 2 and the upper bound to
>>> 3,
>>> > the
>>> > > > system will rescale after 10 seconds. Alternatively, if the user
>>> sets
>>> > the
>>> > > > same value for the lower and upper bounds, the rescale operation
>>> will
>>> > > wait
>>> > > > until all slots are available.
>>> > > >
>>> > > > Best Regrads,
>>> > > > Zdenek Tison
>>> > > >
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Thu, Jul 11, 2024 at 2:38 PM Zdenek Tison <zti...@confluent.io>
>>> > > wrote:
>>> > > >
>>> > > > > Hello,
>>> > > > >
>>> > > > > Our team has been working on several improvements for
>>> > > AdaptiveScheduler,
>>> > > > > specifically focusing on aligning logic and timeouts in the
>>> > > > > WaitingForResources and Executing states. We believe these
>>> > enhancements
>>> > > > > will improve the adaptive scheduler's robustness and
>>> maintainability.
>>> > > > >
>>> > > > > For more detailed information, please refer to the FLIP document.
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit?usp=sharing
>>> > > > >
>>> > > > > Thanks,
>>> > > > > Zdenek Tison
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to