Hi, If there are no further comments, I would propose starting a vote on these changes. But first, I would like to ask a committer to migrate the draft to an FLIP in the Flink Wiki.
Thanks a lot. Kind Regards, Zdenek On Tue, Jul 30, 2024 at 10:36 AM Zdenek Tison <zti...@confluent.io> wrote: > Hi all, > > Based on the discussion, I added a new configuration: > *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*. > We considered the following options for the default value: > > 1. Use a separate default value, e.g., 60s. > 2. Fallback to > *jobmanager.adaptive-scheduler.resource-stabilization-timeout*. > 3. Use the value from > *jobmanager.adaptive-scheduler.scaling-interval.max.* > 4. Use a large number like Duration.ofMillis(Long.MAX_VALUE). > > We decided against option 2) because, as discussed in the mailing list, > the value can be too low. Option 3 was also ruled out since it can be too > high or unset and *scaling-interval.ma <http://scaling-interval.ma>*x > serves a different use case (it works well with *parallelism-increase*). > Option 4 was not chosen because it would affect existing jobs after > migration. After migrating to the new Flink version, rescaling would only > happen if the desired resources were available. However, rescaling happened > with every resource change before migration. > > Therefore, I prefer a new default value: 60s. > > > Additionally, we reviewed the current set of parameters and think there is > a change to align the parameters along the functionality with the release > of 2.0. So, we propose to have these parameters: > *jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout * > *jobmanager.adaptive-scheduler.submission.resource-wait-timeout* > > *jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling* > *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout* > > *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures* > *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay* > > Link to the updated FLIP doc. > <https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit> > > Thanks a lot. > > Regards, > Zdenek > > On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison <zti...@confluent.io> wrote: > >> Hi Gyula, >> >> Thank you for reviewing the document and providing feedback. >> >> 1. I agree that we need two separate parameters for stabilization >> intervals in different states. I will update the FLIP document >> accordingly. >> 2. That's correct. We reached the same conclusion while prototyping >> the implementation. I will add a new bullet point to the FLIP document. >> >> Thanks a lot. >> >> Regards, >> Zdenek >> >> >> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra <gyf...@apache.org> wrote: >> >>> Hi All! >>> >>> Thank you for the proposal, I think it will be great to simplify the >>> current rescaling flow to make it more digestible :) >>> >>> I have 2 comments: >>> >>> 1. Related to what Matthias already pointed out, I think in production >>> scenarios it may be a typical requirement to have a fairly short >>> stabilization interval for job startup (reduce downtime) but overall a >>> longer stabilization period for Executing jobs before rescaling to avoid >>> fluctuations and therefore reduce downtime. I think it would be very >>> important to have 2 configs for that, one could fall back to the other of >>> course if undefined. >>> >>> 2. The document mentions that the stabilization period for executing jobs >>> is measured from the first resource event. I feel that if after the >>> stabilization period we dont have sufficient resources we should >>> completely >>> reset this timer and start the timeout from 0 when the next event >>> arrives. >>> This will be more in line with the concept of stabilization, otherwise if >>> you receive a batch of new resources you may not utilize it because as >>> soon >>> as you have sufficient we rescale immediately. >>> >>> Cheers, >>> Gyula >>> >>> >>> >>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison <zti...@confluent.io.invalid >>> > >>> wrote: >>> >>> > Thanks, Mathias, for your opinions. >>> > >>> > I see two scenarios where different values for starting and rescaling >>> would >>> > be appropriate: >>> > >>> > 1) Flink serverless providers may prefer the fastest possible job >>> startup >>> > time, which can also be achieved by setting a smaller value for the >>> > stabilization timeout, such as 1 second, in the WaitingForResources >>> state. >>> > Conversely, to ensure maximum job uptime, it would be prudent to >>> increase >>> > the stabilization period for rescaling to a higher value, such as 1 >>> minute, >>> > to handle server/node maintenance effectively. >>> > >>> > 2) In Reactive mode, the stabilization period is set to 0 by default. >>> > Setting a different default value for the rescale state could enhance >>> job >>> > stability during node maintenance, especially since the parameter >>> > min-parallelism-increase is no longer applicable. >>> > >>> > Regards, >>> > >>> > Zdenek >>> > >>> > On Tue, Jul 16, 2024 at 5:49 PM Matthias Pohl <map...@apache.org> >>> wrote: >>> > >>> > > Thanks Zdenek for your proposal on aligning the resource control >>> logic >>> > > within the AdaptiveScheduler and cleaning up the rescaling code. >>> > > >>> > > Consolidating the parameters and the code as part of the 2.0 release >>> > makes >>> > > sense in my opinion: The proposed change adds consistent behavior to >>> the >>> > > WaitingForResources and Executing states of the AdaptiveScheduler and >>> > irons >>> > > out some flaws of the current implementation. This should help users >>> get >>> > a >>> > > clearer picture of the resource control logic. Removing obsolete >>> rescale >>> > > waiting time if only sufficient resources are available is also a >>> nice >>> > > improvement. >>> > > >>> > > The j.a.min-parallelism-increase [1] parameter became kind of >>> obsolete >>> > with >>> > > the introduction of the rescale REST endpoint in FLIP-291 [2] as you >>> > > pointed out in the FLIP. So, deprecating it sounds reasonable. >>> > > >>> > > On the topic of replacing the j.a.scaling-interval.max parameter [3] >>> with >>> > > the j.a.resource-stabilization-timeout [4]: I'm in favor of reducing >>> the >>> > > complexity of the Flink configuration. Therefore, using one >>> parameter for >>> > > both (WaitingForResources and Executing state) to stabilize the >>> resources >>> > > sounds like a good idea. >>> > > >>> > > I'm wondering whether there are scenarios, where we would want to >>> have >>> > > different stabilization timeouts for starting (WaitingForResources) >>> and >>> > > rescaling (Executing) a job. In that case, having two resource >>> > > stabilization parameters (one job starts and one for rescales) with >>> one >>> > > being the fallback for the other is a straight-forward solution. >>> > > >>> > > Just as a side note because it came up: Keep in mind that FLIP-461 >>> still >>> > > allows for immediate rescaling on a change event if checkpointing is >>> > > disabled or j.a.max-delay-for-scale-trigger [5] is configured >>> > accordingly. >>> > > >>> > > Best, >>> > > Matthias >>> > > >>> > > [1] >>> > > >>> > > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-min-parallelism-increase >>> > > [2] >>> > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management >>> > > [3] >>> > > >>> > > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max >>> > > [4] >>> > > >>> > > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout >>> > > [5] >>> > > >>> > > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-max-delay-for-scale-trigger >>> > > >>> > > >>> > > >>> > > On Tue, Jul 16, 2024 at 3:05 PM Zdenek Tison >>> <zti...@confluent.io.invalid >>> > > >>> > > wrote: >>> > > >>> > > > Hi, I'd like to move a discussion from Google Docs to the mailing >>> list >>> > so >>> > > > that it's visible to everyone. >>> > > > >>> > > > *Yuanfeng Hu* brought up two concerns: >>> > > > >>> > > > 1) Related to the resource-stabilization-timeout,he thinks 10s May >>> be >>> > too >>> > > > short. In a container environment, if the number of tm added by >>> rest >>> > > > requests is greater than 1, the tm initialization time may be much >>> > longer >>> > > > than 10s. >>> > > > >>> > > > and >>> > > > >>> > > > 2) He proposed a little scenario: >>> > > > There is 1 slot in the entire cluster. At this time, my task is >>> running >>> > > at >>> > > > 1 parallelism (the required slot is also 1). Then I add a >>> tm(1slot), >>> > > which >>> > > > will obviously trigger a change event, and it will become stable >>> after >>> > 10 >>> > > > seconds. If I change the required resources to 3 through rest at >>> this >>> > > time, >>> > > > rescale will be triggered immediately. and runs at a parallelism >>> of 2, >>> > Is >>> > > > this the expected result, or do we expect that the Rescale will be >>> > > > triggered after adding another tm, because this exactly matches the >>> > > > required resources >>> > > > >>> > > > Thank you, *Yuanfeng Hu, *for opening the discussion. >>> > > > >>> > > > >>> > > > >>> > > >>> > >>> --------------------------------------------------------------------------------------- >>> > > > >>> > > > 1) Regarding the stabilization period: >>> > > > >>> > > > I am unsure what you mean by the part, 'if the number of tm added >>> by >>> > rest >>> > > > requests is greater than 1.' However, I understand that it can take >>> > some >>> > > > time to spawn additional containers/pods in a containerized >>> > environment. >>> > > On >>> > > > the other hand, if a user adds more TMs, for instance, by >>> increasing >>> > the >>> > > > number of replicas in a Kubernetes deployment, these replicas >>> should >>> > > appear >>> > > > with some delay but at a similar time, correct? >>> > > > >>> > > > It's worth mentioning that since FLIP-461 >>> > > > < >>> > > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler >>> > > > >, >>> > > > the >>> > > > rescale operation is synchronized with checkpoint events, so the >>> > rescale >>> > > > doesn't happen right after this timeout expires. >>> > > > >>> > > > If we believe it is necessary to have different values for the >>> > > > stabilization period in the Executing and WaitingForResources >>> states, >>> > > even >>> > > > though this increases configuration complexity slightly, we could >>> have >>> > > > separate parameters for these two states: >>> > > > jobmanager.adaptive-scheduler.resource-stabilization-timeout >>> > > > < >>> > > > >>> > > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout >>> > > > > >>> > > > and *jobmanager.adaptive-scheduler.scaling-stabilization-timeout >>> > > > *(replacing >>> > > > the jobmanager.adaptive-scheduler.scaling-interval.max >>> > > > < >>> > > > >>> > > >>> > >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max >>> > > > > >>> > > > ). >>> > > > >>> > > > >>> > > > *2) *Regarding the proposed scenario: >>> > > > >>> > > > The same behavior occurs in the current Flink version when the >>> > > > `min-parallelism-increase` is set to its default value 1. In this >>> case, >>> > > the >>> > > > rescale operation is triggered immediately or aligned with the >>> > checkpoint >>> > > > event (specified in FLIP-461). >>> > > > So, I would say the behavior is expected. >>> > > > Additionally, users can configure the rescaling behavior. For >>> example, >>> > > if a >>> > > > user sets the lower bound parallelism to 2 and the upper bound to >>> 3, >>> > the >>> > > > system will rescale after 10 seconds. Alternatively, if the user >>> sets >>> > the >>> > > > same value for the lower and upper bounds, the rescale operation >>> will >>> > > wait >>> > > > until all slots are available. >>> > > > >>> > > > Best Regrads, >>> > > > Zdenek Tison >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > On Thu, Jul 11, 2024 at 2:38 PM Zdenek Tison <zti...@confluent.io> >>> > > wrote: >>> > > > >>> > > > > Hello, >>> > > > > >>> > > > > Our team has been working on several improvements for >>> > > AdaptiveScheduler, >>> > > > > specifically focusing on aligning logic and timeouts in the >>> > > > > WaitingForResources and Executing states. We believe these >>> > enhancements >>> > > > > will improve the adaptive scheduler's robustness and >>> maintainability. >>> > > > > >>> > > > > For more detailed information, please refer to the FLIP document. >>> > > > > >>> > > > > >>> > > > >>> > > >>> > >>> https://docs.google.com/document/d/1YeYSs64LqgUr3xyBTCjiRE-CT5VEyHjGjqxnxKPIQhM/edit?usp=sharing >>> > > > > >>> > > > > Thanks, >>> > > > > Zdenek Tison >>> > > > > >>> > > > >>> > > >>> > >>> >>