1996fanrui commented on code in PR #922:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/922#discussion_r1901559620


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -259,6 +220,9 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
                                 if (parallelismChange.isNoChange()) {
                                     return;
                                 }
+                                if 
(parallelismChange.isOutsideUtilizationBound()) {
+                                    anyVertexOutsideBound.set(true);
+                                }

Review Comment:
   We check the `OutsideUtilizationBound` of the final recommended parallelism 
instead of the `OutsideUtilizationBound` of latest recommended parallelism.
   
   If the Utilization of all tasks is within range, we can skip scaling.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java:
##########
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.TestMetrics;
+import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.getRestClusterClientSupplier;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end test for {@link DelayedScaleDown}. */
+public class DelayedScaleDownEndToEndTest {
+
+    private static final int INITIAL_SOURCE_PARALLELISM = 200;
+    private static final int INITIAL_SINK_PARALLELISM = 1000;
+    private static final double UTILIZATION_TARGET = 0.8;
+
+    private JobAutoScalerContext<JobID> context;
+    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+
+    TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer;
+
+    private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> 
metricsCollector;
+
+    private JobVertexID source, sink;
+
+    private JobAutoScalerImpl<JobID, JobAutoScalerContext<JobID>> autoscaler;
+    private Instant now;
+    private int expectedMetricSize;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        context = createDefaultJobAutoScalerContext();
+
+        TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector =
+                new TestingEventCollector<>();
+        stateStore = new InMemoryAutoScalerStateStore<>();
+
+        source = new JobVertexID();
+        sink = new JobVertexID();
+
+        metricsCollector =
+                new TestingMetricsCollector<>(
+                        new JobTopology(
+                                new VertexInfo(source, Map.of(), 
INITIAL_SOURCE_PARALLELISM, 4000),
+                                new VertexInfo(
+                                        sink,
+                                        Map.of(source, REBALANCE),
+                                        INITIAL_SINK_PARALLELISM,
+                                        4000)));
+
+        var scaleDownInterval = 
Duration.ofMinutes(60).minus(Duration.ofSeconds(1));
+        // The metric window size is 9:59 to avoid other metrics are mixed.
+        var metricWindow = Duration.ofMinutes(10).minus(Duration.ofSeconds(1));
+
+        var defaultConf = context.getConfiguration();
+        defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, 
Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(0));
+        defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, 
Duration.ofSeconds(0));
+        defaultConf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10000);
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 
UTILIZATION_TARGET);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, 
scaleDownInterval);
+        defaultConf.set(AutoScalerOptions.METRICS_WINDOW, metricWindow);
+
+        scalingRealizer = new TestingScalingRealizer<>();
+        autoscaler =
+                new JobAutoScalerImpl<>(
+                        metricsCollector,
+                        new ScalingMetricEvaluator(),
+                        new ScalingExecutor<>(eventCollector, stateStore),
+                        eventCollector,
+                        scalingRealizer,
+                        stateStore);
+
+        // initially the last evaluated metrics are empty
+        
assertThat(autoscaler.lastEvaluatedMetrics.get(context.getJobKey())).isNull();
+
+        now = Instant.ofEpochMilli(0);
+        setClocksTo(now);
+        running(now);
+
+        metricsCollector.updateMetrics(source, buildMetric(0, 800));
+        metricsCollector.updateMetrics(sink, buildMetric(0, 800));
+
+        // the recommended parallelism values are empty initially
+        autoscaler.scale(context);
+        expectedMetricSize = 1;
+        assertCollectedMetricsSize(expectedMetricSize);
+    }
+
+    /**
+     * The scale down won't be executed before scale down interval window is 
full, and it will use
+     * the max recommended parallelism in the past scale down interval window 
size when scale down
+     * is executed.
+     */
+    @Test
+    void testDelayedScaleDownHappenInLastMetricWindow() throws Exception {
+        var sourceBusyList = List.of(800, 800, 800, 800, 800, 800, 800);
+        var sinkBusyList = List.of(350, 300, 150, 200, 400, 250, 100);
+
+        var metricWindowSize = sourceBusyList.size();
+
+        assertThat(metricWindowSize).isGreaterThan(6);
+
+        var totalRecords = 0L;
+        int recordsPerMinutes = 4800000;
+
+        for (int windowIndex = 0; windowIndex < metricWindowSize; 
windowIndex++) {
+            for (int i = 1; i <= 10; i++) {
+                now = now.plus(Duration.ofMinutes(1));
+                setClocksTo(now);
+
+                metricsCollector.updateMetrics(
+                        source, buildMetric(totalRecords, 
sourceBusyList.get(windowIndex)));
+                metricsCollector.updateMetrics(
+                        sink, buildMetric(totalRecords, 
sinkBusyList.get(windowIndex)));
+
+                autoscaler.scale(context);
+                // Metric window is 10 minutes, so 10 is the maximal metric 
size.
+                expectedMetricSize = Math.min(expectedMetricSize + 1, 10);
+                assertCollectedMetricsSize(expectedMetricSize);
+
+                // Assert the recommended parallelism.
+                if (windowIndex == metricWindowSize - 1 && i == 10) {
+                    // Last metric, we expect scale down is executed, and max 
recommended
+                    // parallelism in the past window should be used.
+                    // The max busy time needs more parallelism than others, 
so we could compute
+                    // parallelism based on the max busy time.
+                    var expectedSourceParallelism =
+                            getExpectedParallelism(sourceBusyList, 
INITIAL_SOURCE_PARALLELISM);
+                    var expectedSinkParallelism =
+                            getExpectedParallelism(sinkBusyList, 
INITIAL_SINK_PARALLELISM);
+                    pollAndAssertScalingRealizer(
+                            expectedSourceParallelism, 
expectedSinkParallelism);
+                } else {
+                    // Otherwise, scale down cannot be executed.
+                    if (windowIndex == 0 && i <= 9) {
+                        // Metric window is not full, so don't have 
recommended parallelism.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM)).isNull();
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM)).isNull();
+                    } else {
+                        // Scale down won't be executed before scale down 
interval window is full.
+                        assertThat(getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SOURCE_PARALLELISM);
+                        assertThat(getCurrentMetricValue(sink, 
RECOMMENDED_PARALLELISM))
+                                .isEqualTo(INITIAL_SINK_PARALLELISM);
+                    }
+                    assertThat(scalingRealizer.events).isEmpty();
+                }
+
+                totalRecords += recordsPerMinutes;
+            }
+        }
+    }
+
+    private static List<List<Integer>>
+            
sinkParallelismMaxRecommendedParallelismWithinUtilizationBoundaryProvider() {
+        return List.of(
+                List.of(
+                        700, 690, 690, 690, 690, 690, 700, 690, 690, 690, 690, 
690, 700, 690, 690,
+                        690, 690, 690, 700, 690, 690, 690, 690, 690, 700, 690, 
690, 690, 690, 690,
+                        700),
+                List.of(700, 690, 700, 690, 700, 690, 700, 690, 700, 690, 700, 
690, 700, 690, 700),
+                List.of(
+                        790, 200, 200, 200, 200, 200, 790, 200, 200, 200, 200, 
200, 790, 200, 200,
+                        200, 200, 200, 790, 200, 200, 200, 200, 200, 790, 200, 
200, 200, 200, 200,
+                        790),
+                List.of(790, 200, 790, 200, 790, 200, 790, 200, 790, 200, 790, 
200, 790, 200, 790));

Review Comment:
   If the `RecommendedParallelism` class does not include 
`outsideUtilizationBound`, this test will be failed for the last 2 inputs.
   
   The scale down interval is 60 minutes, and metric window is 10 minutes, so 
the recommended parallelism will be 790 for the last 2 inputs. When we check 
`outsideUtilizationBound`, we need to check the `outsideUtilizationBound` 
corresponding to the parallelism of 790.
   
   That's why `RecommendedParallelism` class includes 
`outsideUtilizationBound`. If it doesn't include `outsideUtilizationBound`, the 
latest `outsideUtilizationBound` may come from parallelism 200



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to