[ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-36535:
----------------------------
    Description: 
This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down 
to avoid frequent rescaling.
h1. Proposed Change

Treat scale-down.interval as a window:
 * Recording the scale down trigger time when the recommended parallelism < 
current parallelism
 ** When the recommended parallelism >= current parallelism, cancel the 
triggered scale down
 * The scale down will be executed when currentTime - triggerTime > 
scale-down.interval
 ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
window instead of the latest parallelism when scaling down.
 * {color:#de350b}Change2{color}: Never scale down when currentTime - 
triggerTime < scale-down.interval

 ** In the FLINK-36018, the scale down may be executed when currentTime - 
triggerTime < scale-down.interval.
 ** For example: the taskA may scale down when taskB needs to scale up.

h1. Background

Some critical Flink jobs need to scale up in time, but only scale down on a 
daily basis. In other words, Flink users do not want Flink jobs to be scaled 
down multiple times within 24 hours, and the jobs run at the same parallelism 
as during the peak hours of each day. 

Note: Users hope to scale down only happens when the parallelism during peak 
hours is still a waste of resources. This is a trade-off between downtime and 
resource waste for a critical job.
h1. Current solution

In general, this requirement could be met after setting{color:#de350b} 
job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
parallelism, and recommended parallelism is 100 during the peak hours of each 
day. We hope taskA doesn't rescale forever, because the triggered scale down 
will be canceled once the recommended parallelism >= current parallelism within 
24 hours (It‘s exactly what FLINK-36018 does).
h1. Unexpected Scenario & how to solve?

But I found the critical production job is still rescaled about 10 times every 
day (when scale-down.interval is set to 24 hours).

Root cause: There may be many sources in a job, and the traffic peaks of these 
sources may occur at different times. When taskA triggers scale down, the scale 
down of taskA will not be actively executed within 24 hours, but it may be 
executed when other tasks are scaled up.

For example:
 * The scale down of sourceB and sourceC may be executed when SourceA scales up.
 * After a while, the scale down of sourceA and sourceC may be executed when 
SourceB scales up.
 * After a while, the scale down of sourceA and sourceB may be executed when 
SourceC scales up.
 * When there are many tasks, the above 3 steps will be executed repeatedly.

That's why the job is rescaled about 10 times every day, the 
{color:#de350b}change2{color} of proposed change could solve this issue: Never 
scale down when currentTime - triggerTime < scale-down.interval.

 

{color:#de350b}Change1{color}: Using the maximum parallelism within the window 
instead of the latest parallelism when scaling down.
 * It can ensure that the parallelism after scaling down is the parallelism at 
yesterday's peak.

  was:
This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down 
to avoid frequent rescaling. 
h1. Background

Some critical Flink jobs need to scale up in time, but only scale down on a 
daily basis. In other words, Flink users do not want Flink jobs to be scaled 
down multiple times within 24 hours, and the jobs run at the same parallelism 
as during the peak hours of each day. 

Note: Users hope to scale down only happens when the parallelism during peak 
hours is still a waste of resources. This is a trade-off between downtime and 
resource waste for a critical job.
h1. Current solution

In general, this requirement could be met after setting{color:#de350b} 
job.autoscaler.scale-down.interval= 24 hour{color}. For example, the vertex1 
runs with parallelism=100, and the following is the parallelism that the 
autoscaler recommends for vertex1:
 * 100 (2024-10-13 20:00:00, peak hour)
 * 90   (2024-10-13 21:00:00, trigger delayed scale down)
 * 80   (2024-10-13 22:00:00)
 * 70   (2024-10-14 00:00:00)
 * 60   (2024-10-14 01:00:00)
 * 50   (2024-10-14 02:00:00)
 * 40   (2024-10-14 04:00:00)
 * 50   (2024-10-14 06:00:00)
 * 60   (2024-10-14 08:00:00)
 * ...
 * 90   (2024-10-14 19:00:00)
 * 100 (2024-10-14 20:00:00, peak hour, the delayed scale down is canceled)

All recommended parallelism are delayed, and the recommended parallelism is 
backed to 100 within 24 hours. So the scale down request is canceled.

It means if the recommended parallelism for vertex1 during peak hours is 100 
every day, this vertex1 never be scaled down and scaled up. It is very friendly 
to critical jobs, and reducing the scale frequency can greatly reduce the 
downtime.
h1. Some scenarios do not work as expected

When scale down occurs, the latest recommended parallelism is used, not the 
highest parallelism in the last 24 hours. For example:
 * 100 (2024-10-13 20:00:00, peak hour)
 * 100 (2024-10-13 21:00:00, peak hour)
 * 80   (2024-10-13 22:00:00, trigger delayed scale down)
 * 60   (2024-10-14 01:00:00)
 * 40   (2024-10-14 04:00:00)
 * 60   (2024-10-14 08:00:00)
 * ...
 * 90   (2024-10-14 20:00:00, peak hour)
 * 90   (2024-10-14 21:00:00, peak hour)
 * 70   (2024-10-14 22:00:00, scale down happens)

In this case, the peak traffic on 2024-10-14 is lower than the peak traffic on 
2024-10-13. Therefore, the parallelism on 2024-10-14 is 90, which is lower than 
the parallelism of 100 during the peak period on 2024-10-13.

So the delayed scale down happens at 2024-10-14 22:00:00, and the scaled 
parallelism is 70.

{color:#de350b}Unexpected logic{color}:
 * It's better to use the 90 as the new parallelism instead of 70.
 * Generally, {color:#de350b}it's better to use the highest parallelism in the 
last 24 hours(job.autoscaler.scale-down.interval) as the new parallelism{color} 
instead of the latest parallelism.

Reason:
 * When we use the latest parallelism as the new parallelism, there is a high 
probability that a scale-up will be needed in the next 24 hours.
 * Also, if job.autoscaler.scale-down.interval is 36 hours, when scale down 
occurs, it may be a low-peak period. 40 or 50 may be the latest parallelism, 
there is a high probability that a scale-up will be needed in the next 24 
hours. 

h1. Solution:
h2. Solution1: introducing an option

Introducing an option to control whether the highest parallelism within the 
job.autoscaler.scale-down.interval period or the latest parallelism is applied.
h2. Solution2: do not introduce an option
 * When job.autoscaler.scale-down.interval >= 24 hours, using the highest 
parallelism within the job.autoscaler.scale-down.interval period.

 ** We could assume that the user wants the job to achieve the goals I 
mentioned at background part when job.autoscaler.scale-down.interval >= 24 
hours.
 * Otherwise, using the latest parallelism.
 ** We could assume that the user wants to scale down within a day.


> Optimize the scale down logic based on historical parallelism
> -------------------------------------------------------------
>
>                 Key: FLINK-36535
>                 URL: https://issues.apache.org/jira/browse/FLINK-36535
>             Project: Flink
>          Issue Type: Improvement
>          Components: Autoscaler
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>
> This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale 
> down to avoid frequent rescaling.
> h1. Proposed Change
> Treat scale-down.interval as a window:
>  * Recording the scale down trigger time when the recommended parallelism < 
> current parallelism
>  ** When the recommended parallelism >= current parallelism, cancel the 
> triggered scale down
>  * The scale down will be executed when currentTime - triggerTime > 
> scale-down.interval
>  ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * {color:#de350b}Change2{color}: Never scale down when currentTime - 
> triggerTime < scale-down.interval
>  ** In the FLINK-36018, the scale down may be executed when currentTime - 
> triggerTime < scale-down.interval.
>  ** For example: the taskA may scale down when taskB needs to scale up.
> h1. Background
> Some critical Flink jobs need to scale up in time, but only scale down on a 
> daily basis. In other words, Flink users do not want Flink jobs to be scaled 
> down multiple times within 24 hours, and the jobs run at the same parallelism 
> as during the peak hours of each day. 
> Note: Users hope to scale down only happens when the parallelism during peak 
> hours is still a waste of resources. This is a trade-off between downtime and 
> resource waste for a critical job.
> h1. Current solution
> In general, this requirement could be met after setting{color:#de350b} 
> job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
> parallelism, and recommended parallelism is 100 during the peak hours of each 
> day. We hope taskA doesn't rescale forever, because the triggered scale down 
> will be canceled once the recommended parallelism >= current parallelism 
> within 24 hours (It‘s exactly what FLINK-36018 does).
> h1. Unexpected Scenario & how to solve?
> But I found the critical production job is still rescaled about 10 times 
> every day (when scale-down.interval is set to 24 hours).
> Root cause: There may be many sources in a job, and the traffic peaks of 
> these sources may occur at different times. When taskA triggers scale down, 
> the scale down of taskA will not be actively executed within 24 hours, but it 
> may be executed when other tasks are scaled up.
> For example:
>  * The scale down of sourceB and sourceC may be executed when SourceA scales 
> up.
>  * After a while, the scale down of sourceA and sourceC may be executed when 
> SourceB scales up.
>  * After a while, the scale down of sourceA and sourceB may be executed when 
> SourceC scales up.
>  * When there are many tasks, the above 3 steps will be executed repeatedly.
> That's why the job is rescaled about 10 times every day, the 
> {color:#de350b}change2{color} of proposed change could solve this issue: 
> Never scale down when currentTime - triggerTime < scale-down.interval.
>  
> {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * It can ensure that the parallelism after scaling down is the parallelism 
> at yesterday's peak.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to