This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 086236ed245 [Improment]publish workload to BE by tag (#38486) 086236ed245 is described below commit 086236ed245a403cb8a7b0c2af9ef24d6801c3c6 Author: wangbo <wan...@apache.org> AuthorDate: Wed Jul 31 18:29:49 2024 +0800 [Improment]publish workload to BE by tag (#38486) ## Proposed changes A workload group's tag property may be three cases as below: 1 empty string, null or '', it could be published to all BE. 2 a value match some BE' location, then the workload group could only be published to the BE with same tag. 3 not an empty string, but some invalid string which can not math any BE's location, then it could not be published any BE. --- .../doris/common/publish/TopicPublisherThread.java | 28 +++++++++++++++- .../main/java/org/apache/doris/resource/Tag.java | 2 ++ .../resource/workloadgroup/WorkloadGroup.java | 24 +++++++++++--- .../resource/workloadgroup/WorkloadGroupMgr.java | 3 ++ .../main/java/org/apache/doris/system/Backend.java | 37 ++++++++++++++++++++++ .../workloadgroup/WorkloadGroupMgrTest.java | 13 ++++++-- gensrc/thrift/BackendService.thrift | 1 + 7 files changed, 101 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java index f9fdc808498..74cefeca4d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPublishTopicRequest; import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TWorkloadGroupInfo; import org.apache.doris.thrift.TopicInfo; import org.apache.logging.log4j.LogManager; @@ -35,8 +36,10 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; public class TopicPublisherThread extends MasterDaemon { @@ -126,7 +129,30 @@ public class TopicPublisherThread extends MasterDaemon { try { address = new TNetworkAddress(be.getHost(), be.getBePort()); client = ClientPool.backendPool.borrowObject(address); - client.publishTopicInfo(request); + // check whether workload group tag math current be + TPublishTopicRequest copiedRequest = request.deepCopy(); + if (copiedRequest.isSetTopicMap()) { + Map<TTopicInfoType, List<TopicInfo>> topicMap = copiedRequest.getTopicMap(); + List<TopicInfo> topicInfoList = topicMap.get(TTopicInfoType.WORKLOAD_GROUP); + if (topicInfoList != null) { + Set<String> beTagSet = be.getBeWorkloadGroupTagSet(); + Iterator<TopicInfo> topicIter = topicInfoList.iterator(); + while (topicIter.hasNext()) { + TopicInfo topicInfo = topicIter.next(); + if (topicInfo.isSetWorkloadGroupInfo()) { + TWorkloadGroupInfo tWgInfo = topicInfo.getWorkloadGroupInfo(); + if (tWgInfo.isSetTag() && !Backend.isMatchWorkloadGroupTag( + tWgInfo.getTag(), beTagSet)) { + // currently TopicInfo could not contain both policy and workload group, + // so we can remove TopicInfo directly. + topicIter.remove(); + } + } + } + } + } + + client.publishTopicInfo(copiedRequest); ok = true; LOG.info("[topic_publish]publish topic info to be {} success, time cost={} ms, details:{}", be.getHost(), (System.currentTimeMillis() - beginTime), logStr); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java index 9353140b9d4..a51755412b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java @@ -73,6 +73,8 @@ public class Tag implements Writable { public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = "cloud_cluster_private_endpoint"; public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status"; + public static final String WORKLOAD_GROUP = "workload_group"; + public static final ImmutableSet<String> RESERVED_TAG_TYPE = ImmutableSet.of( TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION); public static final ImmutableSet<String> RESERVED_TAG_VALUES = ImmutableSet.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index cae1b25a41f..c6d7aa923b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -18,8 +18,10 @@ package org.apache.doris.resource.workloadgroup; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -30,7 +32,6 @@ import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TWorkloadGroupInfo; import org.apache.doris.thrift.TopicInfo; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; @@ -189,9 +190,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { throws DdlException { Map<String, String> newProperties = new HashMap<>(currentWorkloadGroup.getProperties()); for (Map.Entry<String, String> kv : updateProperties.entrySet()) { - if (!Strings.isNullOrEmpty(kv.getValue())) { - newProperties.put(kv.getKey(), kv.getValue()); - } + newProperties.put(kv.getKey(), kv.getValue()); } checkProperties(newProperties); @@ -416,6 +415,18 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + String tagStr = properties.get(TAG); + if (!StringUtils.isEmpty(tagStr)) { + String[] tagArr = tagStr.split(","); + for (String tag : tagArr) { + try { + FeNameFormat.checkCommonName("workload group tag name", tag); + } catch (AnalysisException e) { + throw new DdlException("workload group tag name format is illegal, " + tagStr); + } + } + } + } public long getId() { @@ -605,6 +616,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr)); } + String tagStr = properties.get(TAG); + if (!StringUtils.isEmpty(tagStr)) { + tWorkloadGroupInfo.setTag(tagStr); + } + TopicInfo topicInfo = new TopicInfo(); topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo); return topicInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 323ecffc2f2..5ddc5fb68f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -394,6 +394,9 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException { String workloadGroupName = stmt.getWorkloadGroupName(); Map<String, String> properties = stmt.getProperties(); + if (properties.size() == 0) { + throw new DdlException("alter workload group should contain at least one property"); + } WorkloadGroup newWorkloadGroup; writeLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index a366aca5d6b..876e6ca40b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -38,7 +38,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,9 +48,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -932,4 +937,36 @@ public class Backend implements Writable { this.lastPublishTaskAccumulatedNum = accumulatedNum; } + public Set<String> getBeWorkloadGroupTagSet() { + Set<String> beTagSet = Sets.newHashSet(); + String beTagStr = this.tagMap.get(Tag.WORKLOAD_GROUP); + if (StringUtils.isEmpty(beTagStr)) { + return beTagSet; + } + + String[] beTagArr = beTagStr.split(","); + for (String beTag : beTagArr) { + beTagSet.add(beTag.trim()); + } + + return beTagSet; + } + + public static boolean isMatchWorkloadGroupTag(String wgTagStr, Set<String> beTagSet) { + if (StringUtils.isEmpty(wgTagStr)) { + return true; + } + if (beTagSet.isEmpty()) { + return false; + } + + String[] wgTagArr = wgTagStr.split(","); + Set<String> wgTagSet = new HashSet<>(); + for (String wgTag : wgTagArr) { + wgTagSet.add(wgTag.trim()); + } + + return !Collections.disjoint(wgTagSet, beTagSet); + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java index 1e73dc79510..5f1e3565966 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java @@ -194,15 +194,24 @@ public class WorkloadGroupMgrTest { Config.enable_workload_group = true; ConnectContext context = new ConnectContext(); WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr(); - Map<String, String> properties = Maps.newHashMap(); + Map<String, String> p0 = Maps.newHashMap(); String name = "g1"; try { - AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, properties); + AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, p0); + workloadGroupMgr.alterWorkloadGroup(stmt1); + } catch (DdlException e) { + Assert.assertTrue(e.getMessage().contains("alter workload group should contain at least one property")); + } + + p0.put(WorkloadGroup.CPU_SHARE, "10"); + try { + AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, p0); workloadGroupMgr.alterWorkloadGroup(stmt1); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("does not exist")); } + Map<String, String> properties = Maps.newHashMap(); properties.put(WorkloadGroup.CPU_SHARE, "10"); properties.put(WorkloadGroup.MEMORY_LIMIT, "30%"); CreateWorkloadGroupStmt createStmt = new CreateWorkloadGroupStmt(false, name, properties); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index b469242ee63..1e52d94f7bb 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -268,6 +268,7 @@ struct TWorkloadGroupInfo { 13: optional i32 spill_threshold_high_watermark 14: optional i64 read_bytes_per_second 15: optional i64 remote_read_bytes_per_second + 16: optional string tag } enum TWorkloadMetricType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org