yiguolei commented on code in PR #37559:
URL: https://github.com/apache/doris/pull/37559#discussion_r1675411516


##########
fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
+import org.apache.doris.thrift.TGlobalResourceUsage;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class AdmissionControl extends MasterDaemon {
+
+    public static final Logger LOG = 
LogManager.getLogger(AdmissionControl.class);
+
+    private volatile boolean isAllBeMemoryEnough = true;
+
+    private SystemInfoService clusterInfoService;
+
+    public AdmissionControl(SystemInfoService clusterInfoService) {
+        super("get-be-resource-usage-thread", 
Config.get_be_resource_usage_interval_ms);
+        this.clusterInfoService = clusterInfoService;
+    }
+
+    private ConcurrentLinkedQueue<QueueToken> queryWaitList = new 
ConcurrentLinkedQueue<>();
+
+    public void addQueueToken(QueueToken queueToken) {
+        queryWaitList.offer(queueToken);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        double maxBeMemoryUsagePercent = getBeMemoryUsage();
+        notifyWaitMemoryQuery(maxBeMemoryUsagePercent);
+    }
+
+    public double getBeMemoryUsage() {
+        double maxBeMemoryUsagePercent = 0;
+        if (Config.query_queue_by_be_used_memory < 0) {
+            this.isAllBeMemoryEnough = true;
+            return maxBeMemoryUsagePercent;
+        }
+        Collection<Backend> backends = 
clusterInfoService.getIdToBackend().values();
+        boolean tmpIsAllBeMemoryEnough = true;
+        for (Backend be : backends) {
+            if (!be.isAlive()) {
+                continue;
+            }
+            TNetworkAddress address = null;
+            BackendService.Client client = null;
+            TGetBeResourceResult result = null;
+            boolean rpcOk = true;
+            try {
+                address = new TNetworkAddress(be.getHost(), be.getBePort());
+                client = ClientPool.backendPool.borrowObject(address, 5000);
+                result = client.getBeResource(new TGetBeResourceRequest());
+            } catch (Throwable t) {
+                rpcOk = false;
+                LOG.warn("get be {} resource failed, ", be.getHost(), t);
+            } finally {
+                try {
+                    if (rpcOk) {
+                        ClientPool.backendPool.returnObject(address, client);
+                    } else {
+                        ClientPool.backendPool.invalidateObject(address, 
client);
+                    }
+                } catch (Throwable e) {
+                    LOG.warn("return rpc client failed. related backend[{}]", 
be.getHost(),
+                            e);
+                }
+            }
+            if (result != null && result.isSetGlobalResourceUsage()) {
+                TGlobalResourceUsage globalResourceUsage = 
result.getGlobalResourceUsage();
+                if (globalResourceUsage != null && 
globalResourceUsage.isSetMemLimit()
+                        && globalResourceUsage.isSetMemUsage()) {
+                    long memUsageL = globalResourceUsage.getMemUsage();
+                    long memLimitL = globalResourceUsage.getMemLimit();
+                    double memUsage = 
Double.valueOf(String.valueOf(memUsageL));
+                    double memLimit = 
Double.valueOf(String.valueOf(memLimitL));
+                    double memUsagePercent = memUsage / memLimit;
+                    maxBeMemoryUsagePercent = Math.max(memUsagePercent, 
maxBeMemoryUsagePercent);
+
+                    if (memUsagePercent > 
Config.query_queue_by_be_used_memory) {
+                        tmpIsAllBeMemoryEnough = false;
+                    }
+                    LOG.debug("be ip:{}, mem limit:{}, mem usage:{}, mem usage 
percent:{}, query queue mem:{}",
+                            be.getHost(), memLimitL, memUsageL, 
memUsagePercent, Config.query_queue_by_be_used_memory);
+                }
+            }
+        }
+        this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
+        return maxBeMemoryUsagePercent;
+    }
+
+    private int getNotifyQueryCountByMemoryUsage(double 
maxBeMemoryUsagePercent) {
+        int count = Integer.MAX_VALUE;
+        if (maxBeMemoryUsagePercent > 0.9) {
+            count = 1;
+        } else if (maxBeMemoryUsagePercent > 0.8) {
+            count = 2;
+        } else if (maxBeMemoryUsagePercent > 0.7) {
+            count = 4;
+        } else if (maxBeMemoryUsagePercent > 0.6) {
+            count = 8;
+        }
+        return count;
+    }
+
+    public void notifyWaitMemoryQuery(double maxBeMemoryUsagePercent) {
+        if (!isAllBeMemoryEnough()) {
+            return;
+        }

Review Comment:
   这里只能计算出一个count 来,然后调用queue.notify 方法就行了。 能run 多少个,后续依靠checkResourceAvailable 
来判定。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to