This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7a7292ad5a0 [branch-2.1][Refactor]use async to get be resource 
(#38389) (#39826)
7a7292ad5a0 is described below

commit 7a7292ad5a0e058b18295ce607bd68ed968cb66f
Author: wangbo <[email protected]>
AuthorDate: Fri Aug 23 17:16:19 2024 +0800

    [branch-2.1][Refactor]use async to get be resource (#38389) (#39826)
    
    pick #38389
---
 be/src/service/backend_service.cpp                 | 10 ---
 be/src/service/backend_service.h                   |  3 -
 be/src/service/internal_service.cpp                | 21 ++++++
 be/src/service/internal_service.h                  |  4 ++
 .../apache/doris/resource/AdmissionControl.java    | 82 ++++++++++------------
 .../org/apache/doris/rpc/BackendServiceClient.java |  5 ++
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 11 +++
 .../org/apache/doris/common/GenericPoolTest.java   |  7 --
 .../apache/doris/utframe/MockedBackendFactory.java |  6 --
 gensrc/proto/internal_service.proto                | 14 ++++
 gensrc/thrift/BackendService.thrift                | 13 ----
 11 files changed, 92 insertions(+), 84 deletions(-)

diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index c30b936769a..0f9fd47948e 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1111,14 +1111,4 @@ void 
BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
     }
 }
 
-void BackendService::get_be_resource(TGetBeResourceResult& result,
-                                     const TGetBeResourceRequest& request) {
-    int64_t mem_usage = PerfCounters::get_vm_rss();
-    int64_t mem_limit = MemInfo::mem_limit();
-    TGlobalResourceUsage global_resource_usage;
-    global_resource_usage.__set_mem_limit(mem_limit);
-    global_resource_usage.__set_mem_usage(mem_usage);
-    result.__set_global_resource_usage(global_resource_usage);
-}
-
 } // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index bbcf103167f..9d53ec4bc45 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -140,9 +140,6 @@ public:
     void query_ingest_binlog(TQueryIngestBinlogResult& result,
                              const TQueryIngestBinlogRequest& request) 
override;
 
-    void get_be_resource(TGetBeResourceResult& result,
-                         const TGetBeResourceRequest& request) override;
-
 private:
     Status start_plan_fragment_execution(const TExecPlanFragmentParams& 
exec_params);
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index ad8769566c0..0801f30fb2e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2328,4 +2328,25 @@ void 
PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* c
     }
 }
 
+void PInternalServiceImpl::get_be_resource(google::protobuf::RpcController* 
controller,
+                                           const PGetBeResourceRequest* 
request,
+                                           PGetBeResourceResponse* response,
+                                           google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        int64_t mem_limit = MemInfo::mem_limit();
+        int64_t mem_usage = PerfCounters::get_vm_rss();
+
+        PGlobalResourceUsage* global_resource_usage = 
response->mutable_global_be_resource_usage();
+        global_resource_usage->set_mem_limit(mem_limit);
+        global_resource_usage->set_mem_usage(mem_usage);
+
+        Status st = Status::OK();
+        response->mutable_status()->set_status_code(st.code());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index abd6d6d1ca6..4bf09255ffb 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -269,6 +269,10 @@ private:
                                        PFetchColIdsResponse* response,
                                        google::protobuf::Closure* done);
 
+    void get_be_resource(google::protobuf::RpcController* controller,
+                         const PGetBeResourceRequest* request, 
PGetBeResourceResponse* response,
+                         google::protobuf::Closure* done) override;
+
 private:
     ExecEnv* _exec_env = nullptr;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java 
b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
index 480afcde5b6..ad4e9f94c05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
@@ -17,24 +17,26 @@
 
 package org.apache.doris.resource;
 
-import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.BackendService;
-import org.apache.doris.thrift.TGetBeResourceRequest;
-import org.apache.doris.thrift.TGetBeResourceResult;
-import org.apache.doris.thrift.TGlobalResourceUsage;
-import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class AdmissionControl extends MasterDaemon {
 
@@ -71,54 +73,44 @@ public class AdmissionControl extends MasterDaemon {
         Collection<Backend> backends = 
clusterInfoService.getIdToBackend().values();
         this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
         boolean tmpIsAllBeMemoryEnough = true;
+        List<Future<InternalService.PGetBeResourceResponse>> futureList = new 
ArrayList();
         for (Backend be : backends) {
             if (!be.isAlive()) {
                 continue;
             }
-            TNetworkAddress address = null;
-            BackendService.Client client = null;
-            TGetBeResourceResult result = null;
-            boolean rpcOk = true;
-            try {
-                address = new TNetworkAddress(be.getHost(), be.getBePort());
-                client = ClientPool.backendPool.borrowObject(address, 5000);
-                result = client.getBeResource(new TGetBeResourceRequest());
-            } catch (Throwable t) {
-                rpcOk = false;
-                LOG.warn("get be {} resource failed, ", be.getHost(), t);
-            } finally {
-                try {
-                    if (rpcOk) {
-                        ClientPool.backendPool.returnObject(address, client);
-                    } else {
-                        ClientPool.backendPool.invalidateObject(address, 
client);
-                    }
-                } catch (Throwable e) {
-                    LOG.warn("return rpc client failed. related backend[{}]", 
be.getHost(),
-                            e);
-                }
+            final InternalService.PGetBeResourceRequest request = 
InternalService.PGetBeResourceRequest.newBuilder()
+                    .build();
+            Future<InternalService.PGetBeResourceResponse> response = 
BackendServiceProxy.getInstance()
+                    .getBeResourceAsync(be.getBrpcAddress(), 5, request);
+            futureList.add(response);
+        }
+
+        for (Future<InternalService.PGetBeResourceResponse> future : 
futureList) {
+            if (future == null) {
+                continue;
             }
-            if (result != null && result.isSetGlobalResourceUsage()) {
-                TGlobalResourceUsage globalResourceUsage = 
result.getGlobalResourceUsage();
-                if (globalResourceUsage != null && 
globalResourceUsage.isSetMemLimit()
-                        && globalResourceUsage.isSetMemUsage()) {
-                    long memUsageL = globalResourceUsage.getMemUsage();
-                    long memLimitL = globalResourceUsage.getMemLimit();
-                    double memUsage = 
Double.valueOf(String.valueOf(memUsageL));
-                    double memLimit = 
Double.valueOf(String.valueOf(memLimitL));
-                    double memUsagePercent = memUsage / memLimit;
-
-                    if (memUsagePercent > this.currentMemoryLimit) {
-                        tmpIsAllBeMemoryEnough = false;
+            try {
+                InternalService.PGetBeResourceResponse response = 
future.get(5, TimeUnit.SECONDS);
+                if (response.hasStatus()) {
+                    Status status = new Status(response.getStatus());
+                    if (status.getErrorCode() == TStatusCode.OK) {
+                        InternalService.PGlobalResourceUsage globalUsage = 
response.getGlobalBeResourceUsage();
+                        long memUsageL = globalUsage.getMemUsage();
+                        long memLimitL = globalUsage.getMemLimit();
+                        double memUsage = 
Double.valueOf(String.valueOf(memUsageL));
+                        double memLimit = 
Double.valueOf(String.valueOf(memLimitL));
+                        double memUsagePercent = memUsage / memLimit;
+                        if (memUsagePercent > this.currentMemoryLimit) {
+                            tmpIsAllBeMemoryEnough = false;
+                            break;
+                        }
                     }
-                    LOG.debug(
-                            "be ip:{}, mem limit:{}, mem usage:{}, mem usage 
percent:{}, "
-                                    + "query queue mem:{}, query wait size:{}",
-                            be.getHost(), memLimitL, memUsageL, 
memUsagePercent, this.currentMemoryLimit,
-                            this.queryWaitQueue.size());
                 }
+            } catch (Throwable t) {
+                LOG.warn("wait get be resource response failed, ", t);
             }
         }
+
         this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 924955e662c..9416687b587 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -184,6 +184,11 @@ public class BackendServiceClient {
     }
 
 
+    public Future<InternalService.PGetBeResourceResponse> 
getBeResource(InternalService.PGetBeResourceRequest request,
+            int timeoutSec) {
+        return stub.withDeadlineAfter(timeoutSec, 
TimeUnit.SECONDS).getBeResource(request);
+    }
+
     public void shutdown() {
         ConnectivityState state = channel.getState(false);
         LOG.warn("shut down backend service client: {}, channel state: {}", 
address, state);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 0fce50c327b..830e7da1a7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -556,5 +556,16 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.PGetBeResourceResponse> 
getBeResourceAsync(TNetworkAddress address, int timeoutSec,
+            InternalService.PGetBeResourceRequest request) {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.getBeResource(request, timeoutSec);
+        } catch (Throwable e) {
+            LOG.warn("get be resource failed, address={}:{}",
+                    address.getHostname(), address.getPort(), e);
+        }
+        return null;
+    }
 
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index af921cd1821..d03d3595682 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -29,8 +29,6 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
-import org.apache.doris.thrift.TGetBeResourceRequest;
-import org.apache.doris.thrift.TGetBeResourceResult;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -150,11 +148,6 @@ public class GenericPoolTest {
             return null;
         }
 
-        @Override
-        public TGetBeResourceResult getBeResource(TGetBeResourceRequest 
request) throws TException {
-            return null;
-        }
-
         @Override
         public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) 
throws TException {
             // TODO Auto-generated method stub
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 0537e7fb7bf..93ce8b6766b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -48,8 +48,6 @@ import org.apache.doris.thrift.TExportState;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
 import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TGetBeResourceRequest;
-import org.apache.doris.thrift.TGetBeResourceResult;
 import org.apache.doris.thrift.THeartbeatResult;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
@@ -387,10 +385,6 @@ public class MockedBackendFactory {
             return new TPublishTopicResult(new TStatus(TStatusCode.OK));
         }
 
-        @Override
-        public TGetBeResourceResult getBeResource(TGetBeResourceRequest 
request) throws TException {
-            return null;
-        }
 
         @Override
         public TStatus submitExportTask(TExportTaskRequest request) throws 
TException {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 1d68d60aa61..c2ec3205541 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -899,6 +899,19 @@ message PFetchRemoteSchemaResponse {
     optional TabletSchemaPB merged_schema = 2;
 }
 
+message PGetBeResourceRequest {
+}
+
+message PGlobalResourceUsage {
+    optional int64 mem_limit = 1;
+    optional int64 mem_usage = 2;
+}
+
+message PGetBeResourceResponse {
+    optional PStatus status = 1;
+    optional PGlobalResourceUsage global_be_resource_usage = 2;
+}
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -947,5 +960,6 @@ service PBackendService {
     rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns 
(PFetchArrowFlightSchemaResult);
     rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns 
(PFetchRemoteSchemaResponse);
     rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns 
(PJdbcTestConnectionResult);
+    rpc get_be_resource(PGetBeResourceRequest) returns 
(PGetBeResourceResponse);
 };
 
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 0255d0d61a3..daf49a25200 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -243,18 +243,6 @@ struct TPublishTopicResult {
 }
 
 
-struct TGetBeResourceRequest {
-}
-
-struct TGlobalResourceUsage {
-    1: optional i64 mem_limit
-    2: optional i64 mem_usage
-}
-
-struct TGetBeResourceResult {
-    1: optional TGlobalResourceUsage global_resource_usage
-}
-
 service BackendService {
     // Called by coord to start asynchronous execution of plan fragment in 
backend.
     // Returns as soon as all incoming data streams have been set up.
@@ -312,5 +300,4 @@ service BackendService {
 
     TPublishTopicResult publish_topic_info(1:TPublishTopicRequest 
topic_request);
 
-    TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to