[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868168#comment-17868168 ]
Gyula Fora commented on FLINK-35285: ------------------------------------ {noformat} I would argue that a current parallelism 40 is not very close to the max parallelism of 120, though. Maybe our patterns are outside the norm? But to me this seems well within a "normal" range.{noformat} My production recommendation is to start with 720 as the max parallelism for anything but the smallest jobs. Your parallelism setting is definitely in the normal range but I think your max parallelism is not ideal given how key distribution works for Flink especially if you want to use the autoscaler. I don't really understand your proposal, let's say the ideal parallelism computed using the scale factor would be X, the autoscaler can decide to scale to anything larger than X as that would satisfy the throughput requirement but we can never scale to lower than X. This bound is true for both scale ups and scale downs. We always have to find the closes to X that is still larger or equal to it. > Autoscaler key group optimization can interfere with scale-down.max-factor > -------------------------------------------------------------------------- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator > Reporter: Trystan > Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)