gyfora commented on code in PR #875: URL: https://github.com/apache/flink-kubernetes-operator/pull/875#discussion_r1738063957
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -131,65 +187,92 @@ public int computeScaleTargetParallelism( Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)), Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM))); - if (newParallelism == currentParallelism - || blockScalingBasedOnPastActions( - context, - vertex, - conf, - evaluatedMetrics, - history, - currentParallelism, - newParallelism)) { - return currentParallelism; + if (newParallelism == currentParallelism) { + // Clear delayed scale down request if the new parallelism is equal to + // currentParallelism. + delayedScaleDown.clearVertex(vertex); + return ParallelismResult.optional(currentParallelism); Review Comment: Maybe we should have something like `ParallelismResult.noChange()` because this looks a bit weird :D ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -131,65 +187,92 @@ public int computeScaleTargetParallelism( Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)), Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM))); - if (newParallelism == currentParallelism - || blockScalingBasedOnPastActions( - context, - vertex, - conf, - evaluatedMetrics, - history, - currentParallelism, - newParallelism)) { - return currentParallelism; + if (newParallelism == currentParallelism) { + // Clear delayed scale down request if the new parallelism is equal to + // currentParallelism. + delayedScaleDown.clearVertex(vertex); + return ParallelismResult.optional(currentParallelism); } // We record our expectations for this scaling operation evaluatedMetrics.put( ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(cappedTargetCapacity)); - return newParallelism; + + return detectBlockScaling( + context, + vertex, + conf, + evaluatedMetrics, + history, + currentParallelism, + newParallelism, + delayedScaleDown); } - private boolean blockScalingBasedOnPastActions( + private ParallelismResult detectBlockScaling( Context context, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, SortedMap<Instant, ScalingSummary> history, int currentParallelism, - int newParallelism) { - - // If we don't have past scaling actions for this vertex, there is nothing to do - if (history.isEmpty()) { - return false; - } - - boolean scaledUp = currentParallelism < newParallelism; - var lastScalingTs = history.lastKey(); - var lastSummary = history.get(lastScalingTs); + int newParallelism, + DelayedScaleDown delayedScaleDown) { + checkArgument( + currentParallelism != newParallelism, + "The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug."); + + var scaledUp = currentParallelism < newParallelism; + + if (scaledUp) { + // Clear delayed scale down request if the new parallelism is greater than + // currentParallelism. + delayedScaleDown.clearVertex(vertex); + + // If we don't have past scaling actions for this vertex, don't block scale up. + if (history.isEmpty()) { + return ParallelismResult.required(newParallelism); + } - if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) { - if (scaledUp) { - return detectIneffectiveScaleUp( - context, vertex, conf, evaluatedMetrics, lastSummary); - } else { - return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs); + var lastSummary = history.get(history.lastKey()); + if (currentParallelism == lastSummary.getNewParallelism() + && lastSummary.isScaledUp() + && detectIneffectiveScaleUp( + context, vertex, conf, evaluatedMetrics, lastSummary)) { + // Block scale up when last rescale is ineffective. + return ParallelismResult.optional(currentParallelism); } + + return ParallelismResult.required(newParallelism); + } else { + return detectImmediateScaleDown(delayedScaleDown, vertex, conf, newParallelism); } - return false; } - private boolean detectImmediateScaleDownAfterScaleUp( - JobVertexID vertex, Configuration conf, Instant lastScalingTs) { + private ParallelismResult detectImmediateScaleDown( + DelayedScaleDown delayedScaleDown, + JobVertexID vertex, + Configuration conf, + int newParallelism) { + var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); + if (scaleDownInterval.toMillis() <= 0) { + // The scale down interval is disable, so don't block scaling. + return ParallelismResult.required(newParallelism); + } + + var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); + if (firstTriggerTime.isEmpty()) { + LOG.info("The scale down request is delayed for {}", vertex); Review Comment: should we maybe log how long it is delayed? ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -71,21 +74,74 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl this.autoScalerEventHandler = autoScalerEventHandler; } - public int computeScaleTargetParallelism( + /** + * The rescaling will be triggered if any vertex's ParallelismResult is required. This means + * that if all vertices' ParallelismResult is optional, rescaling will be ignored. + */ + @Getter + public static class ParallelismResult { Review Comment: Should we call this `ParallelismChange` instead? ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -131,65 +187,92 @@ public int computeScaleTargetParallelism( Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)), Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM))); - if (newParallelism == currentParallelism - || blockScalingBasedOnPastActions( - context, - vertex, - conf, - evaluatedMetrics, - history, - currentParallelism, - newParallelism)) { - return currentParallelism; + if (newParallelism == currentParallelism) { + // Clear delayed scale down request if the new parallelism is equal to + // currentParallelism. + delayedScaleDown.clearVertex(vertex); + return ParallelismResult.optional(currentParallelism); } // We record our expectations for this scaling operation evaluatedMetrics.put( ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(cappedTargetCapacity)); - return newParallelism; + + return detectBlockScaling( + context, + vertex, + conf, + evaluatedMetrics, + history, + currentParallelism, + newParallelism, + delayedScaleDown); } - private boolean blockScalingBasedOnPastActions( + private ParallelismResult detectBlockScaling( Context context, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, SortedMap<Instant, ScalingSummary> history, int currentParallelism, - int newParallelism) { - - // If we don't have past scaling actions for this vertex, there is nothing to do - if (history.isEmpty()) { - return false; - } - - boolean scaledUp = currentParallelism < newParallelism; - var lastScalingTs = history.lastKey(); - var lastSummary = history.get(lastScalingTs); + int newParallelism, + DelayedScaleDown delayedScaleDown) { + checkArgument( + currentParallelism != newParallelism, + "The newParallelism is equal to currentParallelism, no scaling is needed. This is probably a bug."); + + var scaledUp = currentParallelism < newParallelism; + + if (scaledUp) { + // Clear delayed scale down request if the new parallelism is greater than + // currentParallelism. + delayedScaleDown.clearVertex(vertex); + + // If we don't have past scaling actions for this vertex, don't block scale up. + if (history.isEmpty()) { + return ParallelismResult.required(newParallelism); + } - if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) { - if (scaledUp) { - return detectIneffectiveScaleUp( - context, vertex, conf, evaluatedMetrics, lastSummary); - } else { - return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs); + var lastSummary = history.get(history.lastKey()); + if (currentParallelism == lastSummary.getNewParallelism() + && lastSummary.isScaledUp() + && detectIneffectiveScaleUp( + context, vertex, conf, evaluatedMetrics, lastSummary)) { + // Block scale up when last rescale is ineffective. + return ParallelismResult.optional(currentParallelism); } + + return ParallelismResult.required(newParallelism); + } else { + return detectImmediateScaleDown(delayedScaleDown, vertex, conf, newParallelism); } - return false; } - private boolean detectImmediateScaleDownAfterScaleUp( - JobVertexID vertex, Configuration conf, Instant lastScalingTs) { + private ParallelismResult detectImmediateScaleDown( + DelayedScaleDown delayedScaleDown, + JobVertexID vertex, + Configuration conf, + int newParallelism) { + var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL); + if (scaleDownInterval.toMillis() <= 0) { + // The scale down interval is disable, so don't block scaling. + return ParallelismResult.required(newParallelism); + } + + var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); + if (firstTriggerTime.isEmpty()) { + LOG.info("The scale down request is delayed for {}", vertex); + delayedScaleDown.updateTriggerTime(vertex, clock.instant()); + return ParallelismResult.optional(newParallelism); + } - var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD); - if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) { - LOG.info( - "Skipping immediate scale down after scale up within grace period for {}", - vertex); - return true; + if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) { + LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex); + return ParallelismResult.optional(newParallelism); Review Comment: I think it may make sense to keep the previous logic. If you just had a scale up (you need to catch up) it's not good to scale down even if the scale down request was triggered a long time ago. I think we can simply use the old logic here and basically a scale up would "reset" the scale down first trigger time. By actually resetting it we may even be able to get rid of this method completely? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org