mxm commented on code in PR #751:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1450588940


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/resources/ClusterResourceManager.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.resources;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.resources.ResourceCheck;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A cluster resource manager which provides a view over the allocatable 
resources within a
+ * Kubernetes cluster and allows to simulate scheduling pods with a defined 
number of required
+ * resources.
+ *
+ * <p>The goal is to provide a good indicator for whether resources needed for 
autoscaling are going
+ * to be available. This is achieved by pulling the node resource usage from 
the Kubernetes cluster
+ * at a regular configurable interval, after which we use this data to 
simulate adding / removing
+ * resources (pods). Note that this is merely a (pretty good) heuristic 
because the Kubernetes
+ * scheduler has the final saying. However, we prevent 99% of the scenarios 
after pipeline outages
+ * which can lead to massive scale up where all pipelines may be scaled up at 
the same time and
+ * exhaust the number of available resources.
+ *
+ * <p>The simulation can run on a fixed set of Kubernetes nodes. Additionally, 
if we detect that the
+ * cluster is using the Kubernetes Cluster Autoscaler, we will use this data 
to extrapolate the
+ * number of nodes to the maximum defined nodes in the autoscaler 
configuration.
+ *
+ * <p>We currently track CPU and memory. Ephemeral storage is missing because 
there is no easy way
+ * to get node statics on free storage.
+ */
+public class ClusterResourceManager implements ResourceCheck {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClusterResourceManager.class);
+
+    /** ConfigMap name of the Kubernetes Cluster Autoscaler. */
+    static final String CLUSTER_AUTOSCALER_CONFIG_MAP = 
"cluster-autoscaler-status";
+
+    /** EKS specific node group information. Code still works without this 
label. */
+    static final String LABEL_NODE_GROUP = "eks.amazonaws.com/nodegroup";
+
+    private final Duration refreshInterval;
+    private final KubernetesClient kubernetesClient;
+
+    @VisibleForTesting ClusterResourceView clusterResourceView;
+
+    public ClusterResourceManager(Duration refreshInterval, KubernetesClient 
kubernetesClient) {
+        this.refreshInterval = refreshInterval;
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    @Override
+    public synchronized boolean trySchedule(
+            int currentInstances,
+            int newInstances,
+            double cpuPerInstance,
+            double memoryPerInstance) {
+
+        if (refreshInterval.isNegative()) {
+            // Feature disabled
+            return true;
+        }
+
+        if (shouldRefreshView(clusterResourceView, refreshInterval)) {
+            try {
+                clusterResourceView = createResourceView(kubernetesClient);
+            } catch (KubernetesClientException e) {
+                if (e.getCode() == 403) {
+                    LOG.warn(
+                            "No permission to retrieve node resource usage. 
Resource check disabled.");
+                    return true;
+                }
+                throw e;
+            }
+        }
+
+        if (newInstances <= currentInstances) {
+            LOG.debug("Fewer or same amount of resources used after scaling.");
+            return true;
+        }
+
+        return trySchedule(
+                clusterResourceView,
+                currentInstances,
+                newInstances,
+                cpuPerInstance,
+                memoryPerInstance);
+    }
+
+    /**
+     * Simple check whether the new resource requirements can be scheduled. 
Note: This is not a
+     * full-blown scheduler. We may return false-negatives, i.e. we may 
indicate scheduling is not
+     * possible when it actually is. This is still better than false positives 
where we suffer from
+     * downtime due to non-schedulable pods.
+     */
+    private static boolean trySchedule(
+            ClusterResourceView resourceView,
+            int currentInstances,
+            int newInstances,
+            double cpuPerInstance,
+            double memoryPerInstance) {
+
+        resourceView.cancelPending();
+
+        for (int i = 0; i < currentInstances; i++) {
+            resourceView.release(cpuPerInstance, memoryPerInstance);

Review Comment:
   We do need the release logic because otherwise we would never be able to 
remove resources from the internal view for scale downs. That means other 
pipelines might not get scheduled because the accounting is incorrect.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##########
@@ -103,6 +106,42 @@ protected Map<FlinkMetric, AggregatedMetric> 
queryAggregatedVertexMetrics(
         }
     }
 
+    @Override
+    @SneakyThrows
+    protected Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) {
+        Map<String, FlinkMetric> metrics =
+                Map.of(
+                        "numRegisteredTaskManagers", 
FlinkMetric.NUM_TASK_MANAGERS,
+                        "taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL,
+                        "taskSlotsAvailable", 
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE);

Review Comment:
   Sure, but we will need taskSlotsAvailable and taskSlotsTotal to compute 
taskSlotsUsed. I've dropped the number of registered task managers.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -122,6 +126,56 @@ CompletableFuture<P> sendRequest(
         assertEquals(pendingRecordsMetric.getSum(), 200);
     }
 
+    @Test
+    @Timeout(60)
+    public void testJmMetricCollection() throws Exception {
+        try (MiniCluster miniCluster =
+                new MiniCluster(
+                        new MiniClusterConfiguration.Builder()
+                                .setNumTaskManagers(1)
+                                .setNumSlotsPerTaskManager(3)
+                                .build())) {
+            miniCluster.start();
+            var client =
+                    new RestClusterClient<>(
+                            new Configuration(),
+                            "cluster",
+                            (c, e) ->
+                                    new StandaloneClientHAServices(
+                                            
miniCluster.getRestAddress().get().toString()));
+            do {
+                var collector = new RestApiMetricsCollector<>();
+                Map<FlinkMetric, Metric> flinkMetricMetricMap =
+                        collector.queryJmMetrics(
+                                client,
+                                Map.of(
+                                        "numRegisteredTaskManagers", 
FlinkMetric.NUM_TASK_MANAGERS,
+                                        "taskSlotsTotal", 
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
+                                        "taskSlotsAvailable",
+                                                
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
+                try {
+                    System.out.println(flinkMetricMetricMap);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to