mxm commented on code in PR #751: URL: https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1450588940
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.resources; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.resources.ResourceCheck; +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A cluster resource manager which provides a view over the allocatable resources within a + * Kubernetes cluster and allows to simulate scheduling pods with a defined number of required + * resources. + * + * <p>The goal is to provide a good indicator for whether resources needed for autoscaling are going + * to be available. This is achieved by pulling the node resource usage from the Kubernetes cluster + * at a regular configurable interval, after which we use this data to simulate adding / removing + * resources (pods). Note that this is merely a (pretty good) heuristic because the Kubernetes + * scheduler has the final saying. However, we prevent 99% of the scenarios after pipeline outages + * which can lead to massive scale up where all pipelines may be scaled up at the same time and + * exhaust the number of available resources. + * + * <p>The simulation can run on a fixed set of Kubernetes nodes. Additionally, if we detect that the + * cluster is using the Kubernetes Cluster Autoscaler, we will use this data to extrapolate the + * number of nodes to the maximum defined nodes in the autoscaler configuration. + * + * <p>We currently track CPU and memory. Ephemeral storage is missing because there is no easy way + * to get node statics on free storage. + */ +public class ClusterResourceManager implements ResourceCheck { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterResourceManager.class); + + /** ConfigMap name of the Kubernetes Cluster Autoscaler. */ + static final String CLUSTER_AUTOSCALER_CONFIG_MAP = "cluster-autoscaler-status"; + + /** EKS specific node group information. Code still works without this label. */ + static final String LABEL_NODE_GROUP = "eks.amazonaws.com/nodegroup"; + + private final Duration refreshInterval; + private final KubernetesClient kubernetesClient; + + @VisibleForTesting ClusterResourceView clusterResourceView; + + public ClusterResourceManager(Duration refreshInterval, KubernetesClient kubernetesClient) { + this.refreshInterval = refreshInterval; + this.kubernetesClient = kubernetesClient; + } + + @Override + public synchronized boolean trySchedule( + int currentInstances, + int newInstances, + double cpuPerInstance, + double memoryPerInstance) { + + if (refreshInterval.isNegative()) { + // Feature disabled + return true; + } + + if (shouldRefreshView(clusterResourceView, refreshInterval)) { + try { + clusterResourceView = createResourceView(kubernetesClient); + } catch (KubernetesClientException e) { + if (e.getCode() == 403) { + LOG.warn( + "No permission to retrieve node resource usage. Resource check disabled."); + return true; + } + throw e; + } + } + + if (newInstances <= currentInstances) { + LOG.debug("Fewer or same amount of resources used after scaling."); + return true; + } + + return trySchedule( + clusterResourceView, + currentInstances, + newInstances, + cpuPerInstance, + memoryPerInstance); + } + + /** + * Simple check whether the new resource requirements can be scheduled. Note: This is not a + * full-blown scheduler. We may return false-negatives, i.e. we may indicate scheduling is not + * possible when it actually is. This is still better than false positives where we suffer from + * downtime due to non-schedulable pods. + */ + private static boolean trySchedule( + ClusterResourceView resourceView, + int currentInstances, + int newInstances, + double cpuPerInstance, + double memoryPerInstance) { + + resourceView.cancelPending(); + + for (int i = 0; i < currentInstances; i++) { + resourceView.release(cpuPerInstance, memoryPerInstance); Review Comment: We do need the release logic because otherwise we would never be able to remove resources from the internal view for scale downs. That means other pipelines might not get scheduled because the accounting is incorrect. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ########## @@ -103,6 +106,42 @@ protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics( } } + @Override + @SneakyThrows + protected Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) { + Map<String, FlinkMetric> metrics = + Map.of( + "numRegisteredTaskManagers", FlinkMetric.NUM_TASK_MANAGERS, + "taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL, + "taskSlotsAvailable", FlinkMetric.NUM_TASK_SLOTS_AVAILABLE); Review Comment: Sure, but we will need taskSlotsAvailable and taskSlotsTotal to compute taskSlotsUsed. I've dropped the number of registered task managers. ########## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java: ########## @@ -122,6 +126,56 @@ CompletableFuture<P> sendRequest( assertEquals(pendingRecordsMetric.getSum(), 200); } + @Test + @Timeout(60) + public void testJmMetricCollection() throws Exception { + try (MiniCluster miniCluster = + new MiniCluster( + new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(3) + .build())) { + miniCluster.start(); + var client = + new RestClusterClient<>( + new Configuration(), + "cluster", + (c, e) -> + new StandaloneClientHAServices( + miniCluster.getRestAddress().get().toString())); + do { + var collector = new RestApiMetricsCollector<>(); + Map<FlinkMetric, Metric> flinkMetricMetricMap = + collector.queryJmMetrics( + client, + Map.of( + "numRegisteredTaskManagers", FlinkMetric.NUM_TASK_MANAGERS, + "taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL, + "taskSlotsAvailable", + FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)); + try { + System.out.println(flinkMetricMetricMap); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org