This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7a7292ad5a0 [branch-2.1][Refactor]use async to get be resource
(#38389) (#39826)
7a7292ad5a0 is described below
commit 7a7292ad5a0e058b18295ce607bd68ed968cb66f
Author: wangbo <[email protected]>
AuthorDate: Fri Aug 23 17:16:19 2024 +0800
[branch-2.1][Refactor]use async to get be resource (#38389) (#39826)
pick #38389
---
be/src/service/backend_service.cpp | 10 ---
be/src/service/backend_service.h | 3 -
be/src/service/internal_service.cpp | 21 ++++++
be/src/service/internal_service.h | 4 ++
.../apache/doris/resource/AdmissionControl.java | 82 ++++++++++------------
.../org/apache/doris/rpc/BackendServiceClient.java | 5 ++
.../org/apache/doris/rpc/BackendServiceProxy.java | 11 +++
.../org/apache/doris/common/GenericPoolTest.java | 7 --
.../apache/doris/utframe/MockedBackendFactory.java | 6 --
gensrc/proto/internal_service.proto | 14 ++++
gensrc/thrift/BackendService.thrift | 13 ----
11 files changed, 92 insertions(+), 84 deletions(-)
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index c30b936769a..0f9fd47948e 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1111,14 +1111,4 @@ void
BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
}
}
-void BackendService::get_be_resource(TGetBeResourceResult& result,
- const TGetBeResourceRequest& request) {
- int64_t mem_usage = PerfCounters::get_vm_rss();
- int64_t mem_limit = MemInfo::mem_limit();
- TGlobalResourceUsage global_resource_usage;
- global_resource_usage.__set_mem_limit(mem_limit);
- global_resource_usage.__set_mem_usage(mem_usage);
- result.__set_global_resource_usage(global_resource_usage);
-}
-
} // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index bbcf103167f..9d53ec4bc45 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -140,9 +140,6 @@ public:
void query_ingest_binlog(TQueryIngestBinlogResult& result,
const TQueryIngestBinlogRequest& request)
override;
- void get_be_resource(TGetBeResourceResult& result,
- const TGetBeResourceRequest& request) override;
-
private:
Status start_plan_fragment_execution(const TExecPlanFragmentParams&
exec_params);
ExecEnv* _exec_env = nullptr;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index ad8769566c0..0801f30fb2e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2328,4 +2328,25 @@ void
PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* c
}
}
+void PInternalServiceImpl::get_be_resource(google::protobuf::RpcController*
controller,
+ const PGetBeResourceRequest*
request,
+ PGetBeResourceResponse* response,
+ google::protobuf::Closure* done) {
+ bool ret = _light_work_pool.try_offer([response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ int64_t mem_limit = MemInfo::mem_limit();
+ int64_t mem_usage = PerfCounters::get_vm_rss();
+
+ PGlobalResourceUsage* global_resource_usage =
response->mutable_global_be_resource_usage();
+ global_resource_usage->set_mem_limit(mem_limit);
+ global_resource_usage->set_mem_usage(mem_usage);
+
+ Status st = Status::OK();
+ response->mutable_status()->set_status_code(st.code());
+ });
+ if (!ret) {
+ offer_failed(response, done, _light_work_pool);
+ }
+}
+
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index abd6d6d1ca6..4bf09255ffb 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -269,6 +269,10 @@ private:
PFetchColIdsResponse* response,
google::protobuf::Closure* done);
+ void get_be_resource(google::protobuf::RpcController* controller,
+ const PGetBeResourceRequest* request,
PGetBeResourceResponse* response,
+ google::protobuf::Closure* done) override;
+
private:
ExecEnv* _exec_env = nullptr;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
index 480afcde5b6..ad4e9f94c05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
@@ -17,24 +17,26 @@
package org.apache.doris.resource;
-import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.BackendService;
-import org.apache.doris.thrift.TGetBeResourceRequest;
-import org.apache.doris.thrift.TGetBeResourceResult;
-import org.apache.doris.thrift.TGlobalResourceUsage;
-import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class AdmissionControl extends MasterDaemon {
@@ -71,54 +73,44 @@ public class AdmissionControl extends MasterDaemon {
Collection<Backend> backends =
clusterInfoService.getIdToBackend().values();
this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
boolean tmpIsAllBeMemoryEnough = true;
+ List<Future<InternalService.PGetBeResourceResponse>> futureList = new
ArrayList();
for (Backend be : backends) {
if (!be.isAlive()) {
continue;
}
- TNetworkAddress address = null;
- BackendService.Client client = null;
- TGetBeResourceResult result = null;
- boolean rpcOk = true;
- try {
- address = new TNetworkAddress(be.getHost(), be.getBePort());
- client = ClientPool.backendPool.borrowObject(address, 5000);
- result = client.getBeResource(new TGetBeResourceRequest());
- } catch (Throwable t) {
- rpcOk = false;
- LOG.warn("get be {} resource failed, ", be.getHost(), t);
- } finally {
- try {
- if (rpcOk) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address,
client);
- }
- } catch (Throwable e) {
- LOG.warn("return rpc client failed. related backend[{}]",
be.getHost(),
- e);
- }
+ final InternalService.PGetBeResourceRequest request =
InternalService.PGetBeResourceRequest.newBuilder()
+ .build();
+ Future<InternalService.PGetBeResourceResponse> response =
BackendServiceProxy.getInstance()
+ .getBeResourceAsync(be.getBrpcAddress(), 5, request);
+ futureList.add(response);
+ }
+
+ for (Future<InternalService.PGetBeResourceResponse> future :
futureList) {
+ if (future == null) {
+ continue;
}
- if (result != null && result.isSetGlobalResourceUsage()) {
- TGlobalResourceUsage globalResourceUsage =
result.getGlobalResourceUsage();
- if (globalResourceUsage != null &&
globalResourceUsage.isSetMemLimit()
- && globalResourceUsage.isSetMemUsage()) {
- long memUsageL = globalResourceUsage.getMemUsage();
- long memLimitL = globalResourceUsage.getMemLimit();
- double memUsage =
Double.valueOf(String.valueOf(memUsageL));
- double memLimit =
Double.valueOf(String.valueOf(memLimitL));
- double memUsagePercent = memUsage / memLimit;
-
- if (memUsagePercent > this.currentMemoryLimit) {
- tmpIsAllBeMemoryEnough = false;
+ try {
+ InternalService.PGetBeResourceResponse response =
future.get(5, TimeUnit.SECONDS);
+ if (response.hasStatus()) {
+ Status status = new Status(response.getStatus());
+ if (status.getErrorCode() == TStatusCode.OK) {
+ InternalService.PGlobalResourceUsage globalUsage =
response.getGlobalBeResourceUsage();
+ long memUsageL = globalUsage.getMemUsage();
+ long memLimitL = globalUsage.getMemLimit();
+ double memUsage =
Double.valueOf(String.valueOf(memUsageL));
+ double memLimit =
Double.valueOf(String.valueOf(memLimitL));
+ double memUsagePercent = memUsage / memLimit;
+ if (memUsagePercent > this.currentMemoryLimit) {
+ tmpIsAllBeMemoryEnough = false;
+ break;
+ }
}
- LOG.debug(
- "be ip:{}, mem limit:{}, mem usage:{}, mem usage
percent:{}, "
- + "query queue mem:{}, query wait size:{}",
- be.getHost(), memLimitL, memUsageL,
memUsagePercent, this.currentMemoryLimit,
- this.queryWaitQueue.size());
}
+ } catch (Throwable t) {
+ LOG.warn("wait get be resource response failed, ", t);
}
}
+
this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 924955e662c..9416687b587 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -184,6 +184,11 @@ public class BackendServiceClient {
}
+ public Future<InternalService.PGetBeResourceResponse>
getBeResource(InternalService.PGetBeResourceRequest request,
+ int timeoutSec) {
+ return stub.withDeadlineAfter(timeoutSec,
TimeUnit.SECONDS).getBeResource(request);
+ }
+
public void shutdown() {
ConnectivityState state = channel.getState(false);
LOG.warn("shut down backend service client: {}, channel state: {}",
address, state);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 0fce50c327b..830e7da1a7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -556,5 +556,16 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PGetBeResourceResponse>
getBeResourceAsync(TNetworkAddress address, int timeoutSec,
+ InternalService.PGetBeResourceRequest request) {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.getBeResource(request, timeoutSec);
+ } catch (Throwable e) {
+ LOG.warn("get be resource failed, address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ }
+ return null;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index af921cd1821..d03d3595682 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -29,8 +29,6 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
-import org.apache.doris.thrift.TGetBeResourceRequest;
-import org.apache.doris.thrift.TGetBeResourceResult;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TNetworkAddress;
@@ -150,11 +148,6 @@ public class GenericPoolTest {
return null;
}
- @Override
- public TGetBeResourceResult getBeResource(TGetBeResourceRequest
request) throws TException {
- return null;
- }
-
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest)
throws TException {
// TODO Auto-generated method stub
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 0537e7fb7bf..93ce8b6766b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -48,8 +48,6 @@ import org.apache.doris.thrift.TExportState;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TGetBeResourceRequest;
-import org.apache.doris.thrift.TGetBeResourceResult;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
@@ -387,10 +385,6 @@ public class MockedBackendFactory {
return new TPublishTopicResult(new TStatus(TStatusCode.OK));
}
- @Override
- public TGetBeResourceResult getBeResource(TGetBeResourceRequest
request) throws TException {
- return null;
- }
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws
TException {
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 1d68d60aa61..c2ec3205541 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -899,6 +899,19 @@ message PFetchRemoteSchemaResponse {
optional TabletSchemaPB merged_schema = 2;
}
+message PGetBeResourceRequest {
+}
+
+message PGlobalResourceUsage {
+ optional int64 mem_limit = 1;
+ optional int64 mem_usage = 2;
+}
+
+message PGetBeResourceResponse {
+ optional PStatus status = 1;
+ optional PGlobalResourceUsage global_be_resource_usage = 2;
+}
+
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -947,5 +960,6 @@ service PBackendService {
rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns
(PFetchArrowFlightSchemaResult);
rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns
(PFetchRemoteSchemaResponse);
rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns
(PJdbcTestConnectionResult);
+ rpc get_be_resource(PGetBeResourceRequest) returns
(PGetBeResourceResponse);
};
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 0255d0d61a3..daf49a25200 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -243,18 +243,6 @@ struct TPublishTopicResult {
}
-struct TGetBeResourceRequest {
-}
-
-struct TGlobalResourceUsage {
- 1: optional i64 mem_limit
- 2: optional i64 mem_usage
-}
-
-struct TGetBeResourceResult {
- 1: optional TGlobalResourceUsage global_resource_usage
-}
-
service BackendService {
// Called by coord to start asynchronous execution of plan fragment in
backend.
// Returns as soon as all incoming data streams have been set up.
@@ -312,5 +300,4 @@ service BackendService {
TPublishTopicResult publish_topic_info(1:TPublishTopicRequest
topic_request);
- TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]