gyfora commented on code in PR #875:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/875#discussion_r1738063957


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -131,65 +187,92 @@ public int computeScaleTargetParallelism(
                         Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
                         Math.max(currentParallelism, 
conf.getInteger(VERTEX_MAX_PARALLELISM)));
 
-        if (newParallelism == currentParallelism
-                || blockScalingBasedOnPastActions(
-                        context,
-                        vertex,
-                        conf,
-                        evaluatedMetrics,
-                        history,
-                        currentParallelism,
-                        newParallelism)) {
-            return currentParallelism;
+        if (newParallelism == currentParallelism) {
+            // Clear delayed scale down request if the new parallelism is 
equal to
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+            return ParallelismResult.optional(currentParallelism);

Review Comment:
   Maybe we should have something like `ParallelismResult.noChange()` because 
this looks a bit weird :D 



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -131,65 +187,92 @@ public int computeScaleTargetParallelism(
                         Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
                         Math.max(currentParallelism, 
conf.getInteger(VERTEX_MAX_PARALLELISM)));
 
-        if (newParallelism == currentParallelism
-                || blockScalingBasedOnPastActions(
-                        context,
-                        vertex,
-                        conf,
-                        evaluatedMetrics,
-                        history,
-                        currentParallelism,
-                        newParallelism)) {
-            return currentParallelism;
+        if (newParallelism == currentParallelism) {
+            // Clear delayed scale down request if the new parallelism is 
equal to
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+            return ParallelismResult.optional(currentParallelism);
         }
 
         // We record our expectations for this scaling operation
         evaluatedMetrics.put(
                 ScalingMetric.EXPECTED_PROCESSING_RATE,
                 EvaluatedScalingMetric.of(cappedTargetCapacity));
-        return newParallelism;
+
+        return detectBlockScaling(
+                context,
+                vertex,
+                conf,
+                evaluatedMetrics,
+                history,
+                currentParallelism,
+                newParallelism,
+                delayedScaleDown);
     }
 
-    private boolean blockScalingBasedOnPastActions(
+    private ParallelismResult detectBlockScaling(
             Context context,
             JobVertexID vertex,
             Configuration conf,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
             SortedMap<Instant, ScalingSummary> history,
             int currentParallelism,
-            int newParallelism) {
-
-        // If we don't have past scaling actions for this vertex, there is 
nothing to do
-        if (history.isEmpty()) {
-            return false;
-        }
-
-        boolean scaledUp = currentParallelism < newParallelism;
-        var lastScalingTs = history.lastKey();
-        var lastSummary = history.get(lastScalingTs);
+            int newParallelism,
+            DelayedScaleDown delayedScaleDown) {
+        checkArgument(
+                currentParallelism != newParallelism,
+                "The newParallelism is equal to currentParallelism, no scaling 
is needed. This is probably a bug.");
+
+        var scaledUp = currentParallelism < newParallelism;
+
+        if (scaledUp) {
+            // Clear delayed scale down request if the new parallelism is 
greater than
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+
+            // If we don't have past scaling actions for this vertex, don't 
block scale up.
+            if (history.isEmpty()) {
+                return ParallelismResult.required(newParallelism);
+            }
 
-        if (currentParallelism == lastSummary.getNewParallelism() && 
lastSummary.isScaledUp()) {
-            if (scaledUp) {
-                return detectIneffectiveScaleUp(
-                        context, vertex, conf, evaluatedMetrics, lastSummary);
-            } else {
-                return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
lastScalingTs);
+            var lastSummary = history.get(history.lastKey());
+            if (currentParallelism == lastSummary.getNewParallelism()
+                    && lastSummary.isScaledUp()
+                    && detectIneffectiveScaleUp(
+                            context, vertex, conf, evaluatedMetrics, 
lastSummary)) {
+                // Block scale up when last rescale is ineffective.
+                return ParallelismResult.optional(currentParallelism);
             }
+
+            return ParallelismResult.required(newParallelism);
+        } else {
+            return detectImmediateScaleDown(delayedScaleDown, vertex, conf, 
newParallelism);
         }
-        return false;
     }
 
-    private boolean detectImmediateScaleDownAfterScaleUp(
-            JobVertexID vertex, Configuration conf, Instant lastScalingTs) {
+    private ParallelismResult detectImmediateScaleDown(
+            DelayedScaleDown delayedScaleDown,
+            JobVertexID vertex,
+            Configuration conf,
+            int newParallelism) {
+        var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
+        if (scaleDownInterval.toMillis() <= 0) {
+            // The scale down interval is disable, so don't block scaling.
+            return ParallelismResult.required(newParallelism);
+        }
+
+        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
+        if (firstTriggerTime.isEmpty()) {
+            LOG.info("The scale down request is delayed for {}", vertex);

Review Comment:
   should we maybe log how long it is delayed?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -71,21 +74,74 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> 
autoScalerEventHandl
         this.autoScalerEventHandler = autoScalerEventHandler;
     }
 
-    public int computeScaleTargetParallelism(
+    /**
+     * The rescaling will be triggered if any vertex's ParallelismResult is 
required. This means
+     * that if all vertices' ParallelismResult is optional, rescaling will be 
ignored.
+     */
+    @Getter
+    public static class ParallelismResult {

Review Comment:
   Should we call this `ParallelismChange` instead?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -131,65 +187,92 @@ public int computeScaleTargetParallelism(
                         Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
                         Math.max(currentParallelism, 
conf.getInteger(VERTEX_MAX_PARALLELISM)));
 
-        if (newParallelism == currentParallelism
-                || blockScalingBasedOnPastActions(
-                        context,
-                        vertex,
-                        conf,
-                        evaluatedMetrics,
-                        history,
-                        currentParallelism,
-                        newParallelism)) {
-            return currentParallelism;
+        if (newParallelism == currentParallelism) {
+            // Clear delayed scale down request if the new parallelism is 
equal to
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+            return ParallelismResult.optional(currentParallelism);
         }
 
         // We record our expectations for this scaling operation
         evaluatedMetrics.put(
                 ScalingMetric.EXPECTED_PROCESSING_RATE,
                 EvaluatedScalingMetric.of(cappedTargetCapacity));
-        return newParallelism;
+
+        return detectBlockScaling(
+                context,
+                vertex,
+                conf,
+                evaluatedMetrics,
+                history,
+                currentParallelism,
+                newParallelism,
+                delayedScaleDown);
     }
 
-    private boolean blockScalingBasedOnPastActions(
+    private ParallelismResult detectBlockScaling(
             Context context,
             JobVertexID vertex,
             Configuration conf,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
             SortedMap<Instant, ScalingSummary> history,
             int currentParallelism,
-            int newParallelism) {
-
-        // If we don't have past scaling actions for this vertex, there is 
nothing to do
-        if (history.isEmpty()) {
-            return false;
-        }
-
-        boolean scaledUp = currentParallelism < newParallelism;
-        var lastScalingTs = history.lastKey();
-        var lastSummary = history.get(lastScalingTs);
+            int newParallelism,
+            DelayedScaleDown delayedScaleDown) {
+        checkArgument(
+                currentParallelism != newParallelism,
+                "The newParallelism is equal to currentParallelism, no scaling 
is needed. This is probably a bug.");
+
+        var scaledUp = currentParallelism < newParallelism;
+
+        if (scaledUp) {
+            // Clear delayed scale down request if the new parallelism is 
greater than
+            // currentParallelism.
+            delayedScaleDown.clearVertex(vertex);
+
+            // If we don't have past scaling actions for this vertex, don't 
block scale up.
+            if (history.isEmpty()) {
+                return ParallelismResult.required(newParallelism);
+            }
 
-        if (currentParallelism == lastSummary.getNewParallelism() && 
lastSummary.isScaledUp()) {
-            if (scaledUp) {
-                return detectIneffectiveScaleUp(
-                        context, vertex, conf, evaluatedMetrics, lastSummary);
-            } else {
-                return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
lastScalingTs);
+            var lastSummary = history.get(history.lastKey());
+            if (currentParallelism == lastSummary.getNewParallelism()
+                    && lastSummary.isScaledUp()
+                    && detectIneffectiveScaleUp(
+                            context, vertex, conf, evaluatedMetrics, 
lastSummary)) {
+                // Block scale up when last rescale is ineffective.
+                return ParallelismResult.optional(currentParallelism);
             }
+
+            return ParallelismResult.required(newParallelism);
+        } else {
+            return detectImmediateScaleDown(delayedScaleDown, vertex, conf, 
newParallelism);
         }
-        return false;
     }
 
-    private boolean detectImmediateScaleDownAfterScaleUp(
-            JobVertexID vertex, Configuration conf, Instant lastScalingTs) {
+    private ParallelismResult detectImmediateScaleDown(
+            DelayedScaleDown delayedScaleDown,
+            JobVertexID vertex,
+            Configuration conf,
+            int newParallelism) {
+        var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
+        if (scaleDownInterval.toMillis() <= 0) {
+            // The scale down interval is disable, so don't block scaling.
+            return ParallelismResult.required(newParallelism);
+        }
+
+        var firstTriggerTime = 
delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
+        if (firstTriggerTime.isEmpty()) {
+            LOG.info("The scale down request is delayed for {}", vertex);
+            delayedScaleDown.updateTriggerTime(vertex, clock.instant());
+            return ParallelismResult.optional(newParallelism);
+        }
 
-        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
-        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
-            LOG.info(
-                    "Skipping immediate scale down after scale up within grace 
period for {}",
-                    vertex);
-            return true;
+        if 
(clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
+            LOG.debug("Try to skip immediate scale down within scale-down 
interval for {}", vertex);
+            return ParallelismResult.optional(newParallelism);

Review Comment:
   I think it may make sense to keep the previous logic. If you just had a 
scale up (you need to catch up) it's not good to scale down even if the scale 
down request was triggered a long time ago.
   
   I think we can simply use the old logic here and basically a scale up would 
"reset" the scale down first trigger time. By actually resetting it we may even 
be able to get rid of this method completely?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to