1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355987715


##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws 
Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = 
Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, 
Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) 
{
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R 
request) {
+                        if (messageHeaders instanceof 
AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    
aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap 
=
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   I see, removed.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat 
within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Error

Review Comment:
   Sorry, I misunderstand your previous comment.
   
   I think 2 types is enough so far:
   - All events just use the Normal and Error
   - `EventRecorder.Type` just has 2 types, if we have 3 types here, the 
kubernetes event handler will convert 3 types to EventRecorder's 2 types.
   
   I prefer to add the third type in the future if needed. Although this class 
is public, adding enumerations doesn't cause any compatibility issues. So it 
should be very easy to add types.
   
   I don't have a strong opinion here. I will determine keep 2 or 3 types after 
your feedback.
   
   Also, if you think 2 types is fine in this PR, I prefer to keep `Normal` and 
`Warning`.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new 
TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> 
!vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are 
remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        
AutoScalerOptions
+                                                                               
 .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   Thanks for the clarification. Updated it by new a TreeMap.
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    
.withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest 
client to return.");

Review Comment:
   Do you mean moving the 
`KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` here?
   
   It's a generic option for kubernetes operator, it's not only used for 
autoscaler, but also used for a lot of `AbstractFlinkService` methods. That's 
why I didn't move it here.
   
   WDYT?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new 
ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
+                LOG.debug("Autoscaler is disabled");
+                clearParallelismOverrides(ctx);
+                return;
+            }
+
+            if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                lastEvaluatedMetrics.remove(ctx.getJobKey());
+                return;
+            }
+
+            runScalingLogic(ctx, autoscalerMetrics);
+            stateStore.flush(ctx);

Review Comment:
   I see these 2 `if condition` doesn't call the 
`autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());` in master 
branch, and the subsequent code call it.
   
   That's why I extracted these 2 conditions from the `runScalingLogic` method. 
Please correct me if I'm wrong, thanks!
   
   
   [1] 
https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java#L168
   [2] 
https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java#L176



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to