1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355987715
########## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Tests for {@link RestApiMetricsCollector}. */ +public class RestApiMetricsCollectorTest { + + @Test + public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception { + RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector = + new RestApiMetricsCollector<>(); + + JobVertexID jobVertexID = new JobVertexID(); + Map<String, FlinkMetric> flinkMetrics = + Map.of( + "a.pendingRecords", FlinkMetric.PENDING_RECORDS, + "b.pendingRecords", FlinkMetric.PENDING_RECORDS); + Map<JobVertexID, Map<String, FlinkMetric>> metrics = Map.of(jobVertexID, flinkMetrics); + + List<AggregatedMetric> aggregatedMetricsResponse = + List.of( + new AggregatedMetric( + "a.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.), + new AggregatedMetric( + "b.pendingRecords", Double.NaN, Double.NaN, Double.NaN, 100.), + new AggregatedMetric( + "c.unrelated", Double.NaN, Double.NaN, Double.NaN, 100.)); + + Configuration conf = new Configuration(); + RestClusterClient<String> restClusterClient = + new RestClusterClient<>( + conf, + "test-cluster", + (c, e) -> new StandaloneClientHAServices("localhost")) { + @Override + public < + M extends MessageHeaders<R, P, U>, + U extends MessageParameters, + R extends RequestBody, + P extends ResponseBody> + CompletableFuture<P> sendRequest( + M messageHeaders, U messageParameters, R request) { + if (messageHeaders instanceof AggregatedSubtaskMetricsHeaders) { + return (CompletableFuture<P>) + CompletableFuture.completedFuture( + new AggregatedMetricsResponseBody( + aggregatedMetricsResponse)); + } + return (CompletableFuture<P>) + CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + } + }; + + JobID jobID = new JobID(); + JobAutoScalerContext<JobID> context = + new JobAutoScalerContext<>( + jobID, + jobID, + conf, + new UnregisteredMetricsGroup(), + () -> restClusterClient); + + Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap = + collector.queryAllAggregatedMetrics(context, metrics); + + System.out.println(jobVertexIDMapMap); Review Comment: I see, removed. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * Handler all loggable events during scaling. + * + * @param <KEY> The job key. + * @param <Context> Instance of JobAutoScalerContext. + */ +@Experimental +public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> { + + /** + * Handle the event. + * + * @param interval When interval is great than 0, events that repeat within the interval will be + * ignored. + */ + void handleEvent( + Context context, + Type type, + String reason, + String message, + @Nullable String messageKey, + @Nullable Duration interval); + + /** The type of the events. */ + enum Type { + Normal, + Error Review Comment: Sorry, I misunderstand your previous comment. I think 2 types is enough so far: - All events just use the Normal and Error - `EventRecorder.Type` just has 2 types, if we have 3 types here, the kubernetes event handler will convert 3 types to EventRecorder's 2 types. I prefer to add the third type in the future if needed. Although this class is public, adding enumerations doesn't cause any compatibility issues. So it should be very easy to add types. I don't have a strong opinion here. I will determine keep 2 or 3 types after your feedback. Also, if you think 2 types is fine in this PR, I prefer to keep `Normal` and `Warning`. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import javax.annotation.Nonnull; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +/** The utils for scaling history. */ +public class ScalingHistoryUtils { + + public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore( + AutoScalerStateStore<KEY, Context> stateStore, + Context context, + Instant now, + Map<JobVertexID, ScalingSummary> summaries) + throws Exception { + addToScalingHistoryAndStore( + stateStore, + context, + getTrimmedScalingHistory(stateStore, context, now), + now, + summaries); + } + + public static <KEY, Context extends JobAutoScalerContext<KEY>> void addToScalingHistoryAndStore( + AutoScalerStateStore<KEY, Context> stateStore, + Context context, + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory, + Instant now, + Map<JobVertexID, ScalingSummary> summaries) + throws Exception { + + summaries.forEach( + (id, summary) -> + scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary)); + stateStore.storeScalingHistory(context, scalingHistory); + } + + public static <KEY, Context extends JobAutoScalerContext<KEY>> void updateVertexList( + AutoScalerStateStore<KEY, Context> stateStore, + Context ctx, + Instant now, + Set<JobVertexID> vertexSet) + throws Exception { + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimmedScalingHistory = + getTrimmedScalingHistory(stateStore, ctx, now); + + if (trimmedScalingHistory.keySet().removeIf(v -> !vertexSet.contains(v))) { + stateStore.storeScalingHistory(ctx, trimmedScalingHistory); + } + } + + @Nonnull + public static <KEY, Context extends JobAutoScalerContext<KEY>> + Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getTrimmedScalingHistory( + AutoScalerStateStore<KEY, Context> autoScalerStateStore, + Context context, + Instant now) + throws Exception { + var conf = context.getConfiguration(); + return autoScalerStateStore + .getScalingHistory(context) + .map( + scalingHistory -> { + var entryIt = scalingHistory.entrySet().iterator(); + while (entryIt.hasNext()) { + var entry = entryIt.next(); + // Limit how long past scaling decisions are remembered + entry.setValue( + entry.getValue() + .tailMap( + now.minus( + conf.get( + AutoScalerOptions + .VERTEX_SCALING_HISTORY_AGE)))); Review Comment: Thanks for the clarification. Updated it by new a TreeMap. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ########## @@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); + + public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT = + autoScalerConfig("flink.client.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("The timeout for waiting the flink rest client to return."); Review Comment: Do you mean moving the `KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` here? It's a generic option for kubernetes operator, it's not only used for autoscaler, but also used for a lot of `AbstractFlinkService` methods. That's why I didn't move it here. WDYT? ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; + +/** The default implementation of {@link JobAutoScaler}. */ +public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> + implements JobAutoScaler<KEY, Context> { + + private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class); + + @VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError"; + + private final ScalingMetricCollector<KEY, Context> metricsCollector; + private final ScalingMetricEvaluator evaluator; + private final ScalingExecutor<KEY, Context> scalingExecutor; + private final AutoScalerEventHandler<KEY, Context> eventHandler; + private final ScalingRealizer<KEY, Context> scalingRealizer; + private final AutoScalerStateStore<KEY, Context> stateStore; + + @VisibleForTesting + final Map<KEY, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> + lastEvaluatedMetrics = new ConcurrentHashMap<>(); + + @VisibleForTesting + final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap<>(); + + public JobAutoScalerImpl( + ScalingMetricCollector<KEY, Context> metricsCollector, + ScalingMetricEvaluator evaluator, + ScalingExecutor<KEY, Context> scalingExecutor, + AutoScalerEventHandler<KEY, Context> eventHandler, + ScalingRealizer<KEY, Context> scalingRealizer, + AutoScalerStateStore<KEY, Context> stateStore) { + this.metricsCollector = metricsCollector; + this.evaluator = evaluator; + this.scalingExecutor = scalingExecutor; + this.eventHandler = eventHandler; + this.scalingRealizer = scalingRealizer; + this.stateStore = stateStore; + } + + @Override + public void scale(Context ctx) throws Exception { + var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx); + + try { + if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) { + LOG.debug("Autoscaler is disabled"); + clearParallelismOverrides(ctx); + return; + } + + if (ctx.getJobStatus() != JobStatus.RUNNING) { + lastEvaluatedMetrics.remove(ctx.getJobKey()); + return; + } + + runScalingLogic(ctx, autoscalerMetrics); + stateStore.flush(ctx); Review Comment: I see these 2 `if condition` doesn't call the `autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());` in master branch, and the subsequent code call it. That's why I extracted these 2 conditions from the `runScalingLogic` method. Please correct me if I'm wrong, thanks! [1] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java#L168 [2] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java#L176 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org