[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868168#comment-17868168
 ] 

Gyula Fora commented on FLINK-35285:
------------------------------------

{noformat}
I would argue that a current parallelism 40 is not very close to the max 
parallelism of 120, though. Maybe our patterns are outside the norm? But to me 
this seems well within a "normal" range.{noformat}
My production recommendation is to start with 720 as the max parallelism for 
anything but the smallest jobs. Your parallelism setting is definitely in the 
normal range but I think your max parallelism is not ideal given how key 
distribution works for Flink especially if you want to use the autoscaler.

I don't really understand your proposal, let's say the ideal parallelism 
computed using the scale factor would be X, the autoscaler can decide to scale 
to anything larger than X as that would satisfy the throughput requirement but 
we can never scale to lower than X. This bound is true for both scale ups and 
scale downs. We always have to find the closes to X that is still larger or 
equal to it.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --------------------------------------------------------------------------
>
>                 Key: FLINK-35285
>                 URL: https://issues.apache.org/jira/browse/FLINK-35285
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Trystan
>            Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
>     if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
>         if (maxParallelism % p == 0) {
>             return p;
>         }
>     }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to