This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 95b1f76664b [Feature](executor)broker load support workload group
(#30866) (#31580)
95b1f76664b is described below
commit 95b1f76664b9c7ffd7788782d05dfa423d124d3c
Author: wangbo <[email protected]>
AuthorDate: Thu Feb 29 15:09:10 2024 +0800
[Feature](executor)broker load support workload group (#30866) (#31580)
---
.../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 1 +
.../src/main/java/org/apache/doris/load/loadv2/LoadJob.java | 7 ++++++-
.../java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 12 ++++++++++++
.../main/java/org/apache/doris/load/loadv2/LoadManager.java | 8 +++++++-
.../src/main/java/org/apache/doris/qe/MultiLoadMgr.java | 2 +-
.../suites/load_p0/broker_load/test_s3_load.groovy | 4 ++++
6 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 0cbb4e0cfb5..50e46fc383f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -223,6 +223,7 @@ public class BrokerLoadJob extends BulkLoadJob {
TUniqueId loadId = new
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
+ task.settWorkloadGroups(tWorkloadGroups);
idToTasks.put(task.getSignature(), task);
// idToTasks contains previous LoadPendingTasks, so idToTasks
is just used to save all tasks.
// use newLoadingTasks to save new created loading tasks and
submit them later.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c83ce68338f..4eb6be72795 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -52,6 +52,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TEtlState;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
@@ -134,7 +135,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
protected String comment = "";
-
+ protected List<TPipelineWorkloadGroup> tWorkloadGroups = null;
public LoadJob(EtlJobType jobType) {
this.jobType = jobType;
@@ -1166,4 +1167,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
public LoadStatistic getLoadStatistic() {
return loadStatistic;
}
+
+ public void settWorkloadGroups(List<TPipelineWorkloadGroup>
tWorkloadGroups) {
+ this.tWorkloadGroups = tWorkloadGroups;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index a56ba1acd14..8fbabea8629 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -34,6 +34,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
@@ -79,6 +80,8 @@ public class LoadLoadingTask extends LoadTask {
private Profile jobProfile;
private long beginTime;
+ private List<TPipelineWorkloadGroup> tWorkloadGroups = null;
+
public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean
isPartialUpdate,
@@ -164,6 +167,10 @@ public class LoadLoadingTask extends LoadTask {
int timeoutS = Math.max((int) (leftTimeMs / 1000), 1);
curCoordinator.setTimeout(timeoutS);
+ if (tWorkloadGroups != null) {
+ curCoordinator.setTWorkloadGroups(tWorkloadGroups);
+ }
+
try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
actualExecute(curCoordinator, timeoutS);
@@ -221,4 +228,9 @@ public class LoadLoadingTask extends LoadTask {
this.loadId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
planner.updateLoadId(this.loadId);
}
+
+ void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
+ this.tWorkloadGroups = tWorkloadGroups;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 1840494dcb2..a1de8e4405a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -115,7 +115,7 @@ public class LoadManager implements Writable {
/**
* This method will be invoked by the broker load(v2) now.
*/
- public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
+ public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException,
UserException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob;
@@ -144,6 +144,12 @@ public class LoadManager implements Writable {
} finally {
writeUnlock();
}
+
+ if (Config.enable_workload_group) {
+ loadJob.settWorkloadGroups(
+
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get()));
+ }
+
Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
// The job must be submitted after edit log.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index 2d1f512e29e..63f6de760cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -140,7 +140,7 @@ public class MultiLoadMgr {
// 'db' and 'label' form a multiLabel used to
// user can pass commitLabel which use this string commit to jobmgr
- public void commit(String fullDbName, String label) throws DdlException {
+ public void commit(String fullDbName, String label) throws DdlException,
UserException {
LabelName multiLabel = new LabelName(fullDbName, label);
List<Long> jobIds = Lists.newArrayList();
lock.writeLock().lock();
diff --git a/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
b/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
index 6312cc3feb7..3ebf4348dab 100644
--- a/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
@@ -17,6 +17,10 @@
suite("test_s3_load", "load_p0") {
+ sql "create workload group if not exists broker_load_test properties (
'cpu_share'='1024'); "
+
+ sql "set workload_group=broker_load_test;"
+
def tables = [
"agg_tbl_basic",
"dup_tbl_array",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]