Re: [VOTE] FLIP-461: Synchronize rescaling with checkpoint creation to minimize reprocessing for the AdaptiveScheduler

2024-06-17 Thread Zdenek Tison
+1 (non-binding)

Best,
Zdenek

On Mon, Jun 17, 2024 at 10:24 AM Matthias Pohl  wrote:

> Hi everyone,
> the discussion in [1] about FLIP-461 [2] is kind of concluded. I am
> starting a vote on this one here.
>
> The vote will be open for at least 72 hours (i.e. until June 20, 2024;
> 8:30am UTC) unless there are any objections. The FLIP will be considered
> accepted if 3 binding votes (from active committers according to the Flink
> bylaws [3]) are gathered by the community.
>
> Best,
> Matthias
>
> [1] https://lists.apache.org/thread/nnkonmsv8xlk0go2sgtwnphkhrr5oc3y
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Approvals
>


[DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-11 Thread Zdenek Tison
Hello,

Our team has been working on several improvements for AdaptiveScheduler,
specifically focusing on aligning logic and timeouts in the
WaitingForResources and Executing states. We believe these enhancements
will improve the adaptive scheduler's robustness and maintainability.

For more detailed information, please refer to the FLIP document.
https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit?usp=sharing

Thanks,
Zdenek Tison


Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-16 Thread Zdenek Tison
Hi, I'd like to move a discussion from Google Docs to the mailing list so
that it's visible to everyone.

*Yuanfeng Hu* brought up two concerns:

1) Related to the resource-stabilization-timeout,he thinks 10s May be too
short. In a container environment, if the number of tm added by rest
requests is greater than 1, the tm initialization time may be much longer
than 10s.

and

2) He proposed a little scenario:
There is 1 slot in the entire cluster. At this time, my task is running at
1 parallelism (the required slot is also 1). Then I add a tm(1slot), which
will obviously trigger a change event, and it will become stable after 10
seconds. If I change the required resources to 3 through rest at this time,
rescale will be triggered immediately. and runs at a parallelism of 2, Is
this the expected result, or do we expect that the Rescale will be
triggered after adding another tm, because this exactly matches the
required resources

Thank you, *Yuanfeng Hu, *for opening the discussion.

---

1) Regarding the stabilization period:

I am unsure what you mean by the part, 'if the number of tm added by rest
requests is greater than 1.' However, I understand that it can take some
time to spawn additional containers/pods in a containerized environment. On
the other hand, if a user adds more TMs, for instance, by increasing the
number of replicas in a Kubernetes deployment, these replicas should appear
with some delay but at a similar time, correct?

It's worth mentioning that since  FLIP-461
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler>,
the
rescale operation is synchronized with checkpoint events, so the rescale
doesn't happen right after this timeout expires.

If we believe it is necessary to have different values for the
stabilization period in the Executing and WaitingForResources states, even
though this increases configuration complexity slightly, we could have
separate parameters for these two states:
jobmanager.adaptive-scheduler.resource-stabilization-timeout
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout>
 and *jobmanager.adaptive-scheduler.scaling-stabilization-timeout *(replacing
the jobmanager.adaptive-scheduler.scaling-interval.max
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max>
).


*2) *Regarding the proposed scenario:

The same behavior occurs in the current Flink version when the
`min-parallelism-increase` is set to its default value 1. In this case, the
rescale operation is triggered immediately or aligned with the checkpoint
event (specified in FLIP-461).
So, I would say the behavior is expected.
Additionally, users can configure the rescaling behavior. For example, if a
user sets the lower bound parallelism to 2 and the upper bound to 3, the
system will rescale after 10 seconds. Alternatively, if the user sets the
same value for the lower and upper bounds, the rescale operation will wait
until all slots are available.

Best Regrads,
Zdenek Tison




On Thu, Jul 11, 2024 at 2:38 PM Zdenek Tison  wrote:

> Hello,
>
> Our team has been working on several improvements for AdaptiveScheduler,
> specifically focusing on aligning logic and timeouts in the
> WaitingForResources and Executing states. We believe these enhancements
> will improve the adaptive scheduler's robustness and maintainability.
>
> For more detailed information, please refer to the FLIP document.
>
> https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit?usp=sharing
>
> Thanks,
> Zdenek Tison
>


Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-18 Thread Zdenek Tison
Thanks, Mathias, for your opinions.

I see two scenarios where different values for starting and rescaling would
be appropriate:

1) Flink serverless providers may prefer the fastest possible job startup
time, which can also be achieved by setting a smaller value for the
stabilization timeout, such as 1 second, in the WaitingForResources state.
Conversely, to ensure maximum job uptime, it would be prudent to increase
the stabilization period for rescaling to a higher value, such as 1 minute,
to handle server/node maintenance effectively.

2) In Reactive mode, the stabilization period is set to 0 by default.
Setting a different default value for the rescale state could enhance job
stability during node maintenance, especially since the parameter
min-parallelism-increase is no longer applicable.

Regards,

Zdenek

On Tue, Jul 16, 2024 at 5:49 PM Matthias Pohl  wrote:

> Thanks Zdenek for your proposal on aligning the resource control logic
> within the AdaptiveScheduler and cleaning up the rescaling code.
>
> Consolidating the parameters and the code as part of the 2.0 release makes
> sense in my opinion: The proposed change adds consistent behavior to the
> WaitingForResources and Executing states of the AdaptiveScheduler and irons
> out some flaws of the current implementation. This should help users get a
> clearer picture of the resource control logic. Removing obsolete rescale
> waiting time if only sufficient resources are available is also a nice
> improvement.
>
> The j.a.min-parallelism-increase [1] parameter became kind of obsolete with
> the introduction of the rescale REST endpoint in FLIP-291 [2] as you
> pointed out in the FLIP. So, deprecating it sounds reasonable.
>
> On the topic of replacing the j.a.scaling-interval.max parameter [3] with
> the j.a.resource-stabilization-timeout [4]: I'm in favor of reducing the
> complexity of the Flink configuration. Therefore, using one parameter for
> both (WaitingForResources and Executing state) to stabilize the resources
> sounds like a good idea.
>
> I'm wondering whether there are scenarios, where we would want to have
> different stabilization timeouts for starting (WaitingForResources) and
> rescaling (Executing) a job. In that case, having two resource
> stabilization parameters (one job starts and one for rescales) with one
> being the fallback for the other is a straight-forward solution.
>
> Just as a side note because it came up: Keep in mind that FLIP-461 still
> allows for immediate rescaling on a change event if checkpointing is
> disabled or j.a.max-delay-for-scale-trigger [5] is configured accordingly.
>
> Best,
> Matthias
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-min-parallelism-increase
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max
> [4]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
> [5]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-max-delay-for-scale-trigger
>
>
>
> On Tue, Jul 16, 2024 at 3:05 PM Zdenek Tison 
> wrote:
>
> > Hi, I'd like to move a discussion from Google Docs to the mailing list so
> > that it's visible to everyone.
> >
> > *Yuanfeng Hu* brought up two concerns:
> >
> > 1) Related to the resource-stabilization-timeout,he thinks 10s May be too
> > short. In a container environment, if the number of tm added by rest
> > requests is greater than 1, the tm initialization time may be much longer
> > than 10s.
> >
> > and
> >
> > 2) He proposed a little scenario:
> > There is 1 slot in the entire cluster. At this time, my task is running
> at
> > 1 parallelism (the required slot is also 1). Then I add a tm(1slot),
> which
> > will obviously trigger a change event, and it will become stable after 10
> > seconds. If I change the required resources to 3 through rest at this
> time,
> > rescale will be triggered immediately. and runs at a parallelism of 2, Is
> > this the expected result, or do we expect that the Rescale will be
> > triggered after adding another tm, because this exactly matches the
> > required resources
> >
> > Thank you, *Yuanfeng Hu, *for opening the discussion.
> >
> >
> >
> ---
> >
> > 1) Regarding the stabilization

Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-24 Thread Zdenek Tison
Hi Gyula,

Thank you for reviewing the document and providing feedback.

   1. I agree that we need two separate parameters for stabilization
   intervals in different states. I will update the FLIP document accordingly.
   2. That's correct. We reached the same conclusion while prototyping the
   implementation. I will add a new bullet point to the FLIP document.

Thanks a lot.

Regards,
Zdenek


On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra  wrote:

> Hi All!
>
> Thank you for the proposal, I think it will be great to simplify the
> current rescaling flow to make it more digestible :)
>
> I have 2 comments:
>
> 1. Related to what Matthias already pointed out, I think in production
> scenarios it may be a typical requirement to have a fairly short
> stabilization interval for job startup (reduce downtime) but overall a
> longer stabilization period for Executing jobs before rescaling to avoid
> fluctuations and therefore reduce downtime. I think it would be very
> important to have 2 configs for that, one could fall back to the other of
> course if undefined.
>
> 2. The document mentions that the stabilization period for executing jobs
> is measured from the first resource event. I feel that if after the
> stabilization period we dont have sufficient resources we should completely
> reset this timer and start the timeout from 0 when the next event arrives.
> This will be more in line with the concept of stabilization, otherwise if
> you receive a batch of new resources you may not utilize it because as soon
> as you have sufficient we rescale immediately.
>
> Cheers,
> Gyula
>
>
>
> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison 
> wrote:
>
> > Thanks, Mathias, for your opinions.
> >
> > I see two scenarios where different values for starting and rescaling
> would
> > be appropriate:
> >
> > 1) Flink serverless providers may prefer the fastest possible job startup
> > time, which can also be achieved by setting a smaller value for the
> > stabilization timeout, such as 1 second, in the WaitingForResources
> state.
> > Conversely, to ensure maximum job uptime, it would be prudent to increase
> > the stabilization period for rescaling to a higher value, such as 1
> minute,
> > to handle server/node maintenance effectively.
> >
> > 2) In Reactive mode, the stabilization period is set to 0 by default.
> > Setting a different default value for the rescale state could enhance job
> > stability during node maintenance, especially since the parameter
> > min-parallelism-increase is no longer applicable.
> >
> > Regards,
> >
> > Zdenek
> >
> > On Tue, Jul 16, 2024 at 5:49 PM Matthias Pohl  wrote:
> >
> > > Thanks Zdenek for your proposal on aligning the resource control logic
> > > within the AdaptiveScheduler and cleaning up the rescaling code.
> > >
> > > Consolidating the parameters and the code as part of the 2.0 release
> > makes
> > > sense in my opinion: The proposed change adds consistent behavior to
> the
> > > WaitingForResources and Executing states of the AdaptiveScheduler and
> > irons
> > > out some flaws of the current implementation. This should help users
> get
> > a
> > > clearer picture of the resource control logic. Removing obsolete
> rescale
> > > waiting time if only sufficient resources are available is also a nice
> > > improvement.
> > >
> > > The j.a.min-parallelism-increase [1] parameter became kind of obsolete
> > with
> > > the introduction of the rescale REST endpoint in FLIP-291 [2] as you
> > > pointed out in the FLIP. So, deprecating it sounds reasonable.
> > >
> > > On the topic of replacing the j.a.scaling-interval.max parameter [3]
> with
> > > the j.a.resource-stabilization-timeout [4]: I'm in favor of reducing
> the
> > > complexity of the Flink configuration. Therefore, using one parameter
> for
> > > both (WaitingForResources and Executing state) to stabilize the
> resources
> > > sounds like a good idea.
> > >
> > > I'm wondering whether there are scenarios, where we would want to have
> > > different stabilization timeouts for starting (WaitingForResources) and
> > > rescaling (Executing) a job. In that case, having two resource
> > > stabilization parameters (one job starts and one for rescales) with one
> > > being the fallback for the other is a straight-forward solution.
> > >
> > > Just as a side note because it came up: Keep in mind that FLIP-461
> still
> > > allows for immediate rescaling on a change event

Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-30 Thread Zdenek Tison
Hi all,

Based on the discussion, I added a new configuration:
*jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
We considered the following options for the default value:

   1. Use a separate default value, e.g., 60s.
   2. Fallback to
   *jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
   3. Use the value from
   *jobmanager.adaptive-scheduler.scaling-interval.max.*
   4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).

We decided against option 2) because, as discussed in the mailing list, the
value can be too low. Option 3 was also ruled out since it can be too high
or unset and *scaling-interval.ma <http://scaling-interval.ma>*x serves a
different use case (it works well with *parallelism-increase*). Option 4
was not chosen because it would affect existing jobs after migration. After
migrating to the new Flink version, rescaling would only happen if the
desired resources were available. However, rescaling happened with every
resource change before migration.

Therefore, I prefer a new default value: 60s.


Additionally, we reviewed the current set of parameters and think there is
a change to align the parameters along the functionality with the release
of 2.0. So, we propose to have these parameters:
*jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout *
*jobmanager.adaptive-scheduler.submission.resource-wait-timeout*

*jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
*jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
*jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
*jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*

Link to the updated FLIP doc.
<https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit>

Thanks a lot.

Regards,
Zdenek

On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison  wrote:

> Hi Gyula,
>
> Thank you for reviewing the document and providing feedback.
>
>1. I agree that we need two separate parameters for stabilization
>intervals in different states. I will update the FLIP document accordingly.
>2. That's correct. We reached the same conclusion while prototyping
>the implementation. I will add a new bullet point to the FLIP document.
>
> Thanks a lot.
>
> Regards,
> Zdenek
>
>
> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra  wrote:
>
>> Hi All!
>>
>> Thank you for the proposal, I think it will be great to simplify the
>> current rescaling flow to make it more digestible :)
>>
>> I have 2 comments:
>>
>> 1. Related to what Matthias already pointed out, I think in production
>> scenarios it may be a typical requirement to have a fairly short
>> stabilization interval for job startup (reduce downtime) but overall a
>> longer stabilization period for Executing jobs before rescaling to avoid
>> fluctuations and therefore reduce downtime. I think it would be very
>> important to have 2 configs for that, one could fall back to the other of
>> course if undefined.
>>
>> 2. The document mentions that the stabilization period for executing jobs
>> is measured from the first resource event. I feel that if after the
>> stabilization period we dont have sufficient resources we should
>> completely
>> reset this timer and start the timeout from 0 when the next event arrives.
>> This will be more in line with the concept of stabilization, otherwise if
>> you receive a batch of new resources you may not utilize it because as
>> soon
>> as you have sufficient we rescale immediately.
>>
>> Cheers,
>> Gyula
>>
>>
>>
>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison > >
>> wrote:
>>
>> > Thanks, Mathias, for your opinions.
>> >
>> > I see two scenarios where different values for starting and rescaling
>> would
>> > be appropriate:
>> >
>> > 1) Flink serverless providers may prefer the fastest possible job
>> startup
>> > time, which can also be achieved by setting a smaller value for the
>> > stabilization timeout, such as 1 second, in the WaitingForResources
>> state.
>> > Conversely, to ensure maximum job uptime, it would be prudent to
>> increase
>> > the stabilization period for rescaling to a higher value, such as 1
>> minute,
>> > to handle server/node maintenance effectively.
>> >
>> > 2) In Reactive mode, the stabilization period is set to 0 by default.
>> > Setting a different default value for the rescale state could enhance
>> job
>> > stability during node maintenance, especially since the parameter
>> > min-parallelism-increase is no l

Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-30 Thread Zdenek Tison
Hi,

If there are no further comments, I would propose starting a vote on these
changes. But first, I would like to ask a committer to migrate the draft to
an FLIP in the Flink Wiki.

Thanks a lot.

Kind Regards,

Zdenek

On Tue, Jul 30, 2024 at 10:36 AM Zdenek Tison  wrote:

> Hi all,
>
> Based on the discussion, I added a new configuration:
> *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
> We considered the following options for the default value:
>
>1. Use a separate default value, e.g., 60s.
>2. Fallback to
>*jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
>3. Use the value from
>*jobmanager.adaptive-scheduler.scaling-interval.max.*
>4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).
>
> We decided against option 2) because, as discussed in the mailing list,
> the value can be too low. Option 3 was also ruled out since it can be too
> high or unset and *scaling-interval.ma <http://scaling-interval.ma>*x
> serves a different use case (it works well with *parallelism-increase*).
> Option 4 was not chosen because it would affect existing jobs after
> migration. After migrating to the new Flink version, rescaling would only
> happen if the desired resources were available. However, rescaling happened
> with every resource change before migration.
>
> Therefore, I prefer a new default value: 60s.
>
>
> Additionally, we reviewed the current set of parameters and think there is
> a change to align the parameters along the functionality with the release
> of 2.0. So, we propose to have these parameters:
> *jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout *
> *jobmanager.adaptive-scheduler.submission.resource-wait-timeout*
>
> *jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
> *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
>
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*
>
> Link to the updated FLIP doc.
> <https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit>
>
> Thanks a lot.
>
> Regards,
> Zdenek
>
> On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison  wrote:
>
>> Hi Gyula,
>>
>> Thank you for reviewing the document and providing feedback.
>>
>>1. I agree that we need two separate parameters for stabilization
>>intervals in different states. I will update the FLIP document 
>> accordingly.
>>2. That's correct. We reached the same conclusion while prototyping
>>the implementation. I will add a new bullet point to the FLIP document.
>>
>> Thanks a lot.
>>
>> Regards,
>> Zdenek
>>
>>
>> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> Thank you for the proposal, I think it will be great to simplify the
>>> current rescaling flow to make it more digestible :)
>>>
>>> I have 2 comments:
>>>
>>> 1. Related to what Matthias already pointed out, I think in production
>>> scenarios it may be a typical requirement to have a fairly short
>>> stabilization interval for job startup (reduce downtime) but overall a
>>> longer stabilization period for Executing jobs before rescaling to avoid
>>> fluctuations and therefore reduce downtime. I think it would be very
>>> important to have 2 configs for that, one could fall back to the other of
>>> course if undefined.
>>>
>>> 2. The document mentions that the stabilization period for executing jobs
>>> is measured from the first resource event. I feel that if after the
>>> stabilization period we dont have sufficient resources we should
>>> completely
>>> reset this timer and start the timeout from 0 when the next event
>>> arrives.
>>> This will be more in line with the concept of stabilization, otherwise if
>>> you receive a batch of new resources you may not utilize it because as
>>> soon
>>> as you have sufficient we rescale immediately.
>>>
>>> Cheers,
>>> Gyula
>>>
>>>
>>>
>>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison >> >
>>> wrote:
>>>
>>> > Thanks, Mathias, for your opinions.
>>> >
>>> > I see two scenarios where different values for starting and rescaling
>>> would
>>> > be appropriate:
>>> >
>>> > 1) Flink serverless providers may prefer the fastest possible job
>>> startup
>

Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-08-04 Thread Zdenek Tison
Hi Rui,

Yes, it's right.
We will update the FLIP's parameters table to make it more explicit.

Thanks

Best,
Zdenek

On Mon, Aug 5, 2024 at 5:33 AM Rui Fan <1996fan...@gmail.com> wrote:

> Thanks Zdenek Tison and Mattias for driving this proposal!
> It's indeed a great improvement for Adaptive Scheduler.
>
> Sorry for the late reply, overall LGTM, I have one minor comment:
>
> These 2 configuration options were introduced since 2.0, and it's not
> released to users.
> So we can update them directly, and don't need to consider them as fallback
> options, right?
>
> - jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count
> - jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
>
> Best,
> Rui
>
> On Sat, Aug 3, 2024 at 12:20 AM Matthias Pohl 
> wrote:
>
> > Thanks Zdenek for addressing the comments. I copied the draft into the
> FLIP
> > collection under FLIP-472 [1].
> > Looks like there are no additional comments. Feel free to open a voting
> > thread on this proposal.
> >
> > Best,
> > Matthias
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states
> >
> > On Tue, Jul 30, 2024 at 10:48 AM Zdenek Tison
>  > >
> > wrote:
> >
> > > Hi,
> > >
> > > If there are no further comments, I would propose starting a vote on
> > these
> > > changes. But first, I would like to ask a committer to migrate the
> draft
> > to
> > > an FLIP in the Flink Wiki.
> > >
> > > Thanks a lot.
> > >
> > > Kind Regards,
> > >
> > > Zdenek
> > >
> > > On Tue, Jul 30, 2024 at 10:36 AM Zdenek Tison 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Based on the discussion, I added a new configuration:
> > > >
> > *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
> > > > We considered the following options for the default value:
> > > >
> > > >1. Use a separate default value, e.g., 60s.
> > > >2. Fallback to
> > > >*jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
> > > >3. Use the value from
> > > >*jobmanager.adaptive-scheduler.scaling-interval.max.*
> > > >4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).
> > > >
> > > > We decided against option 2) because, as discussed in the mailing
> list,
> > > > the value can be too low. Option 3 was also ruled out since it can be
> > too
> > > > high or unset and *scaling-interval.ma <http://scaling-interval.ma
> >*x
> > > > serves a different use case (it works well with
> > *parallelism-increase*).
> > > > Option 4 was not chosen because it would affect existing jobs after
> > > > migration. After migrating to the new Flink version, rescaling would
> > only
> > > > happen if the desired resources were available. However, rescaling
> > > happened
> > > > with every resource change before migration.
> > > >
> > > > Therefore, I prefer a new default value: 60s.
> > > >
> > > >
> > > > Additionally, we reviewed the current set of parameters and think
> there
> > > is
> > > > a change to align the parameters along the functionality with the
> > release
> > > > of 2.0. So, we propose to have these parameters:
> > > >
> > *jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout
> > > *
> > > > *jobmanager.adaptive-scheduler.submission.resource-wait-timeout*
> > > >
> > > > *jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
> > > >
> > *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
> > > >
> > > >
> > >
> >
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
> > > > *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*
> > > >
> > > > Link to the updated FLIP doc.
> > > > <
> > >
> >
> https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit
> > > >
> > > >
> > > > Thanks a lot.
> > > >
> > > > Regards,
> > > > Zdenek
> > > >
> > > > On Wed, Jul 24, 2024 a

[VOTE] FLIP-472: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-08-05 Thread Zdenek Tison
Hi everyone,


Thanks for all the feedback about FLIP-472: Aligning timeout logic in the
AdaptiveScheduler's WaitingForResources and Executing states [1]. The
discussion thread can be found here [2].


The vote will be open for at least 72 hours unless there are any objections

or insufficient votes.

The FLIP will be considered accepted if 3 binding votes (from active
committers according to the Flink bylaws [3]) are gathered by the community.



Best,


Zdenek


[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states



[2] https://lists.apache.org/thread/krnjv8fm62nbnrljmk3bfoons86pc1dw


[3]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws#FlinkBylaws-Approvals


Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-08-05 Thread Zdenek Tison
Thanks everyone for your feedback. Since there were no open questions, we
have started voting [1].

Best,
Zdenek

[1] https://lists.apache.org/thread/3yod1cf06qxf5jny0hrbcxp33tlp7vs6

On Mon, Aug 5, 2024 at 9:03 AM Matthias Pohl 
wrote:

> Ok, I went ahead and updated the FLIP accordingly [1].
>
> [1]
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=315494203&selectedPageVersions=7&selectedPageVersions=5
>
> On Mon, Aug 5, 2024 at 8:56 AM Zdenek Tison 
> wrote:
>
> > Hi Rui,
> >
> > Yes, it's right.
> > We will update the FLIP's parameters table to make it more explicit.
> >
> > Thanks
> >
> > Best,
> > Zdenek
> >
> > On Mon, Aug 5, 2024 at 5:33 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Thanks Zdenek Tison and Mattias for driving this proposal!
> > > It's indeed a great improvement for Adaptive Scheduler.
> > >
> > > Sorry for the late reply, overall LGTM, I have one minor comment:
> > >
> > > These 2 configuration options were introduced since 2.0, and it's not
> > > released to users.
> > > So we can update them directly, and don't need to consider them as
> > fallback
> > > options, right?
> > >
> > > - jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count
> > > - jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
> > >
> > > Best,
> > > Rui
> > >
> > > On Sat, Aug 3, 2024 at 12:20 AM Matthias Pohl
>  > >
> > > wrote:
> > >
> > > > Thanks Zdenek for addressing the comments. I copied the draft into
> the
> > > FLIP
> > > > collection under FLIP-472 [1].
> > > > Looks like there are no additional comments. Feel free to open a
> voting
> > > > thread on this proposal.
> > > >
> > > > Best,
> > > > Matthias
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states
> > > >
> > > > On Tue, Jul 30, 2024 at 10:48 AM Zdenek Tison
> > >  > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > If there are no further comments, I would propose starting a vote
> on
> > > > these
> > > > > changes. But first, I would like to ask a committer to migrate the
> > > draft
> > > > to
> > > > > an FLIP in the Flink Wiki.
> > > > >
> > > > > Thanks a lot.
> > > > >
> > > > > Kind Regards,
> > > > >
> > > > > Zdenek
> > > > >
> > > > > On Tue, Jul 30, 2024 at 10:36 AM Zdenek Tison  >
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Based on the discussion, I added a new configuration:
> > > > > >
> > > >
> > *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
> > > > > > We considered the following options for the default value:
> > > > > >
> > > > > >1. Use a separate default value, e.g., 60s.
> > > > > >2. Fallback to
> > > > > >
> *jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
> > > > > >3. Use the value from
> > > > > >*jobmanager.adaptive-scheduler.scaling-interval.max.*
> > > > > >4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).
> > > > > >
> > > > > > We decided against option 2) because, as discussed in the mailing
> > > list,
> > > > > > the value can be too low. Option 3 was also ruled out since it
> can
> > be
> > > > too
> > > > > > high or unset and *scaling-interval.ma <
> http://scaling-interval.ma
> > > >*x
> > > > > > serves a different use case (it works well with
> > > > *parallelism-increase*).
> > > > > > Option 4 was not chosen because it would affect existing jobs
> after
> > > > > > migration. After migrating to the new Flink version, rescaling
> > would
> > > > only
> > > > > > happen if the desired resources were avai

[RESULT][VOTE] FLIP-472: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-08-08 Thread Zdenek Tison
Hi everyone,

I'm delighted to announce that FLIP-472 [1] has been accepted.

There were 5 votes in favor:
- Rui Fan (binding)
- Yuanfeng Hu (no-binding)
- David Moravek (binding)
- Matthias Pohl (binding)
- Yuepeng Pan (no-binding)

There were no votes against it.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states

Best,
Zdenek Tison


[jira] [Created] (FLINK-30403) The reported latest completed checkpoint is discarded

2022-12-13 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-30403:


 Summary: The reported latest completed checkpoint is discarded
 Key: FLINK-30403
 URL: https://issues.apache.org/jira/browse/FLINK-30403
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Zdenek Tison


There is a small window where the reported latest completed checkpoint can be 
marked as discarded while the new checkpoint wasn't reported yet. 

The reason is that the function _addCompletedCheckpointToStoreAndSubsumeOldest_ 
 is called before _reportCompletedCheckpoint_ in _CheckpointCoordinator._

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36011) Generalize RescaleManager to become StateTransitionManager

2024-08-08 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-36011:


 Summary: Generalize RescaleManager to become StateTransitionManager
 Key: FLINK-36011
 URL: https://issues.apache.org/jira/browse/FLINK-36011
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zdenek Tison
 Fix For: 2.0.0


The goal is to change the RescaleManager component to one with a broader 
responsibility that will manage the adaptive scheduler's state transitions.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36012) Integrate StateTransitionManager into WaitingForResources state

2024-08-08 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-36012:


 Summary: Integrate StateTransitionManager into WaitingForResources 
state
 Key: FLINK-36012
 URL: https://issues.apache.org/jira/browse/FLINK-36012
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zdenek Tison


The StateTransitionManager will be used in the WaitingForResources state to 
manage the transition to a subsequent state. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36013) Introduce the transition from Restarting to CreatingExecutionGraph state

2024-08-08 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-36013:


 Summary: Introduce the transition from Restarting to 
CreatingExecutionGraph state
 Key: FLINK-36013
 URL: https://issues.apache.org/jira/browse/FLINK-36013
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zdenek Tison


The AdaptiveScheduler omits the WaitingForResources state when rescaling. Pass 
a flag into the Restarting state that directs the state transition to the 
CreatingExecutinggraph instead of WaitingForResources. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36014) Align the desired and sufficient resources definiton in Executing and WaitForResources states

2024-08-08 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-36014:


 Summary: Align the desired and sufficient resources definiton in 
Executing and WaitForResources states
 Key: FLINK-36014
 URL: https://issues.apache.org/jira/browse/FLINK-36014
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zdenek Tison


The goal is to use the same definition for the desired and sufficient resources 
in the Executing state as in the WaitingForResources state. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36015) Align rescale parameters

2024-08-08 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-36015:


 Summary: Align rescale parameters
 Key: FLINK-36015
 URL: https://issues.apache.org/jira/browse/FLINK-36015
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Zdenek Tison


* Parameter 
[_jobmanager.adaptive-scheduler.resource-wait-timeout_|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-wait-timeout]
 will be renamed to the 
jobmanager.adaptive-scheduler.submission.resource-wait-timeout
 * Parameter 
[_jobmanager.adaptive-scheduler.resource-stabilization-timeout_|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-wait-timeout]
 will be renamed to the 
jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout
 * Parameter 
{_}j{_}[_obmanager.adaptive-scheduler.scaling-interval.min_|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min]
 will be renamed to the 
jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling
 * Parameter 
[_jobmanager.adaptive-scheduler.scaling-interval.max_|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 will be renamed to the 
{_}jobmanager.adaptive-scheduler{_}{_}.{_}executing.resource-stabilization-timeout
 with default value 60s. 
 * Parameter 
[jobmanager.adaptive-scheduler.min-parallelism-increase|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-min-parallelism-increase]
 will be removed without a direct replacement. Still, it will be superseded by 
combining the parameters 
jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling and 
{_}jobmanager.adaptive-scheduler{_}{_}.{_}executing.resource-stabilization-timeout



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36016) Synchronize initialization time and clock usage

2024-08-08 Thread Zdenek Tison (Jira)
Zdenek Tison created FLINK-36016:


 Summary: Synchronize initialization time and clock usage 
 Key: FLINK-36016
 URL: https://issues.apache.org/jira/browse/FLINK-36016
 Project: Flink
  Issue Type: Sub-task
Reporter: Zdenek Tison


StateTransitionManager's initialization time and the clock parameter should be 
based on the same time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)