This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 26ce91bddd [Query Resource Isolation] Workload Configs (#15109)
26ce91bddd is described below
commit 26ce91bdddfde13cc78fb960477f56f21662d2a1
Author: Praveen <[email protected]>
AuthorDate: Fri Jun 13 09:28:19 2025 -0700
[Query Resource Isolation] Workload Configs (#15109)
* Workload Configs
* workload config
* Add API
* config
* Change config structure
* Propagation strategy
* Fix style check
* Cost spliting on update
* Table addition propagation
* perf
* Tests
* test
* test 2
* Review comments 1
* review comments 3
* review comments 3
* name change
* review comments 4
---
.../BrokerUserDefinedMessageHandlerFactory.java | 31 ++
.../messages/QueryWorkloadRefreshMessage.java | 69 +++++
.../pinot/common/metadata/ZKMetadataProvider.java | 57 ++++
.../utils/config/QueryWorkloadConfigUtils.java | 234 +++++++++++++++
.../utils/config/QueryWorkloadConfigUtilsTest.java | 199 ++++++++++++
.../pinot/controller/api/resources/Constants.java | 1 +
.../PinotQueryWorkloadRestletResource.java | 332 +++++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 61 +++-
.../controller/workload/QueryWorkloadManager.java | 203 +++++++++++++
.../workload/scheme/DefaultPropagationScheme.java | 51 ++++
.../workload/scheme/PropagationScheme.java | 36 +++
.../workload/scheme/PropagationSchemeProvider.java | 42 +++
.../workload/scheme/PropagationUtils.java | 211 +++++++++++++
.../workload/scheme/TablePropagationScheme.java | 78 +++++
.../workload/scheme/TenantPropagationScheme.java | 72 +++++
.../controller/workload/splitter/CostSplitter.java | 48 +++
.../workload/splitter/DefaultCostSplitter.java | 51 ++++
.../controller/workload/PropagationUtilsTest.java | 214 +++++++++++++
.../java/org/apache/pinot/core/auth/Actions.java | 4 +
.../server/starter/helix/BaseServerStarter.java | 6 +
.../helix/QueryWorkloadMessageHandlerFactory.java | 87 ++++++
.../spi/config/workload/EnforcementProfile.java | 78 +++++
.../pinot/spi/config/workload/InstanceCost.java | 79 +++++
.../pinot/spi/config/workload/NodeConfig.java | 126 ++++++++
.../spi/config/workload/PropagationScheme.java | 158 ++++++++++
.../spi/config/workload/QueryWorkloadConfig.java | 109 +++++++
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
27 files changed, 2637 insertions(+), 1 deletion(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 81ea3d0d4f..1b2e7ed045 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -28,10 +28,12 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.spi.config.workload.InstanceCost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,9 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
return new RefreshDatabaseConfigMessageHandler(new
DatabaseConfigRefreshMessage(message), context);
case ApplicationQpsQuotaRefreshMessage.REFRESH_APP_QUOTA_MSG_SUB_TYPE:
return new RefreshApplicationQpsQuotaMessageHandler(new
ApplicationQpsQuotaRefreshMessage(message), context);
+ case QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE:
+ case QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE:
+ return new QueryWorkloadRefreshMessageHandler(new
QueryWorkloadRefreshMessage(message), context);
default:
// NOTE: Log a warning and return no-op message handler for
unsupported message sub-types. This can happen when
// a new message sub-type is added, and the sender gets deployed
first while receiver is still running the
@@ -259,4 +264,30 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
LOGGER.error("Got error for no-op message handling (error code: {},
error type: {})", code, type, e);
}
}
+
+ private static class QueryWorkloadRefreshMessageHandler extends
MessageHandler {
+ final String _queryWorkloadName;
+ final InstanceCost _instanceCost;
+
+ QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage
queryWorkloadRefreshMessage,
+ NotificationContext context) {
+ super(queryWorkloadRefreshMessage, context);
+ _queryWorkloadName = queryWorkloadRefreshMessage.getQueryWorkloadName();
+ _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ // TODO: Add logic to invoke the query workload manager to
refresh/delete the query workload config
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode errorCode, ErrorType errorType)
{
+ LOGGER.error("Got error while refreshing query workload config for query
workload: {} (error code: {},"
+ + " error type: {})", _queryWorkloadName, errorCode, errorType, e);
+ }
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
new file mode 100644
index 0000000000..85f4da123e
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+
+
+/**
+ * Message to refresh the query workload on the instances.
+ * This message include the host level cost for each instance.
+ */
+public class QueryWorkloadRefreshMessage extends Message {
+ public static final String REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE =
"REFRESH_QUERY_WORKLOAD";
+ public static final String DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE =
"DELETE_QUERY_WORKLOAD";
+ public static final String QUERY_WORKLOAD_NAME = "queryWorkloadName";
+ public static final String INSTANCE_COST = "instanceCost";
+
+ /**
+ * Constructor for the sender.
+ */
+ public QueryWorkloadRefreshMessage(String queryWorkloadName, String
messageSubType,
+ @Nullable InstanceCost instanceCost) {
+ super(Message.MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(messageSubType);
+ // Give it infinite time to process the message, as long as session is
alive
+ setExecutionTimeout(-1);
+ QueryWorkloadConfigUtils.updateZNRecordWithInstanceCost(getRecord(),
queryWorkloadName, instanceCost);
+ }
+
+ /**
+ * Constructor for the receiver.
+ */
+ public QueryWorkloadRefreshMessage(Message message) {
+ super(message.getRecord());
+ if (!message.getMsgSubType().equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
+ ||
!message.getMsgSubType().equals(DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
+ throw new IllegalArgumentException("Unknown message subtype:" +
message.getMsgSubType());
+ }
+ }
+
+ public String getQueryWorkloadName() {
+ return getRecord().getSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME);
+ }
+
+ public InstanceCost getInstanceCost() {
+ return QueryWorkloadConfigUtils.getInstanceCostFromZNRecord(getRecord());
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index dfb2bbe401..cfb4ad4546 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -43,12 +43,14 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.LogicalTableConfigUtils;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
+import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -82,6 +84,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX =
"/CONFIGS/CLUSTER";
private static final String PROPERTYSTORE_SEGMENT_LINEAGE =
"/SEGMENT_LINEAGE";
private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX =
"/MINION_TASK_METADATA";
+ private static final String PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX =
"/CONFIGS/QUERYWORKLOAD";
public static void setUserConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String username, ZNRecord znRecord) {
propertyStore.set(constructPropertyStorePathForUserConfig(username),
znRecord, AccessOption.PERSISTENT);
@@ -305,6 +308,14 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
tableNameWithType);
}
+ public static String getPropertyStoreWorkloadConfigsPrefix() {
+ return PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX;
+ }
+
+ public static String constructPropertyStorePathForQueryWorkloadConfig(String
workloadName) {
+ return StringUtil.join("/", PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX,
workloadName);
+ }
+
@Deprecated
public static String
constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
String tableNameWithType) {
@@ -837,6 +848,52 @@ public class ZKMetadataProvider {
}
}
+ public static List<QueryWorkloadConfig>
getAllQueryWorkloadConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ List<ZNRecord> znRecords =
+ propertyStore.getChildren(getPropertyStoreWorkloadConfigsPrefix(),
null, AccessOption.PERSISTENT,
+ CommonConstants.Helix.ZkClient.RETRY_COUNT,
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+ if (znRecords == null) {
+ return Collections.emptyList();
+ }
+ int numZNRecords = znRecords.size();
+ List<QueryWorkloadConfig> queryWorkloadConfigs = new
ArrayList<>(numZNRecords);
+ for (ZNRecord znRecord : znRecords) {
+
queryWorkloadConfigs.add(QueryWorkloadConfigUtils.fromZNRecord(znRecord));
+ }
+ return queryWorkloadConfigs;
+ }
+
+ @Nullable
+ public static QueryWorkloadConfig
getQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String workloadName) {
+ ZNRecord znRecord =
propertyStore.get(constructPropertyStorePathForQueryWorkloadConfig(workloadName),
+ null, AccessOption.PERSISTENT);
+ if (znRecord == null) {
+ return null;
+ }
+ return QueryWorkloadConfigUtils.fromZNRecord(znRecord);
+ }
+
+ public static boolean setQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore,
+ QueryWorkloadConfig queryWorkloadConfig) {
+ String path =
constructPropertyStorePathForQueryWorkloadConfig(queryWorkloadConfig.getQueryWorkloadName());
+ boolean isNewConfig = !propertyStore.exists(path, AccessOption.PERSISTENT);
+ ZNRecord znRecord = isNewConfig ? new
ZNRecord(queryWorkloadConfig.getQueryWorkloadName())
+ : propertyStore.get(path, null, AccessOption.PERSISTENT);
+ // Update the record with new workload configuration
+ QueryWorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord,
queryWorkloadConfig);
+ // Create or update based on existence
+ return isNewConfig ? propertyStore.create(path, znRecord,
AccessOption.PERSISTENT)
+ : propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
+ }
+
+ public static void deleteQueryWorkloadConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String workloadName) {
+ String propertyStorePath =
constructPropertyStorePathForQueryWorkloadConfig(workloadName);
+ if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
+ propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
+ }
+ }
+
public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore,
LogicalTableConfig logicalTableConfig) {
try {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java
new file mode 100644
index 0000000000..034e4724f0
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.config;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.common.utils.http.HttpClientConfig;
+import org.apache.pinot.common.utils.tls.TlsUtils;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+
+
+public class QueryWorkloadConfigUtils {
+ private QueryWorkloadConfigUtils() {
+ }
+
+ private static final Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(QueryWorkloadConfigUtils.class);
+ private static final HttpClient HTTP_CLIENT = new
HttpClient(HttpClientConfig.DEFAULT_HTTP_CLIENT_CONFIG,
+ TlsUtils.getSslContext());
+
+ /**
+ * Converts a ZNRecord into a QueryWorkloadConfig object by extracting
mapFields.
+ *
+ * @param znRecord The ZNRecord containing workload config data.
+ * @return A QueryWorkloadConfig object.
+ */
+ public static QueryWorkloadConfig fromZNRecord(ZNRecord znRecord) {
+ Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
+ String queryWorkloadName =
znRecord.getSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME);
+ Preconditions.checkNotNull(queryWorkloadName, "queryWorkloadName cannot be
null");
+ String nodeConfigsJson =
znRecord.getSimpleField(QueryWorkloadConfig.NODE_CONFIGS);
+ Preconditions.checkNotNull(nodeConfigsJson, "nodeConfigs cannot be null");
+ try {
+ List<NodeConfig> nodeConfigs = JsonUtils.stringToObject(nodeConfigsJson,
new TypeReference<>() { });
+ return new QueryWorkloadConfig(queryWorkloadName, nodeConfigs);
+ } catch (Exception e) {
+ String errorMessage = String.format("Failed to convert ZNRecord : %s to
QueryWorkloadConfig", znRecord);
+ throw new RuntimeException(errorMessage, e);
+ }
+ }
+
+ /**
+ * Updates a ZNRecord with the fields from a WorkloadConfig object.
+ *
+ * @param queryWorkloadConfig The QueryWorkloadConfig object to convert.
+ * @param znRecord The ZNRecord to update.
+ */
+ public static void updateZNRecordWithWorkloadConfig(ZNRecord znRecord,
QueryWorkloadConfig queryWorkloadConfig) {
+ znRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
queryWorkloadConfig.getQueryWorkloadName());
+ try {
+ znRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
+ JsonUtils.objectToString(queryWorkloadConfig.getNodeConfigs()));
+ } catch (Exception e) {
+ String errorMessage = String.format("Failed to convert
QueryWorkloadConfig : %s to ZNRecord",
+ queryWorkloadConfig);
+ throw new RuntimeException(errorMessage, e);
+ }
+ }
+
+ public static void updateZNRecordWithInstanceCost(ZNRecord znRecord, String
queryWorkloadName,
+ InstanceCost instanceCost) {
+ Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
+ Preconditions.checkNotNull(instanceCost, "InstanceCost cannot be null");
+ try {
+ znRecord.setSimpleField(QueryWorkloadRefreshMessage.QUERY_WORKLOAD_NAME,
queryWorkloadName);
+ znRecord.setSimpleField(QueryWorkloadRefreshMessage.INSTANCE_COST,
JsonUtils.objectToString(instanceCost));
+ } catch (Exception e) {
+ String errorMessage = String.format("Failed to convert InstanceCost : %s
to ZNRecord",
+ instanceCost);
+ throw new RuntimeException(errorMessage, e);
+ }
+ }
+
+ public static InstanceCost getInstanceCostFromZNRecord(ZNRecord znRecord) {
+ Preconditions.checkNotNull(znRecord, "ZNRecord cannot be null");
+ String instanceCostJson =
znRecord.getSimpleField(QueryWorkloadRefreshMessage.INSTANCE_COST);
+ Preconditions.checkNotNull(instanceCostJson, "InstanceCost cannot be
null");
+ try {
+ return JsonUtils.stringToObject(instanceCostJson, InstanceCost.class);
+ } catch (Exception e) {
+ String errorMessage = String.format("Failed to convert ZNRecord : %s to
InstanceCost", znRecord);
+ throw new RuntimeException(errorMessage, e);
+ }
+ }
+ /**
+ * Fetches query workload configs for a specific instance from the
controller.
+ *
+ * @param controllerUrl The URL of the controller.
+ * @param instanceId The ID of the instance to fetch configs for.
+ * @param nodeType The type of node (e.g., BROKER, SERVER).
+ * @return A map of workload names to their corresponding InstanceCost
objects.
+ */
+ public static Map<String, InstanceCost>
getQueryWorkloadConfigsFromController(String controllerUrl, String instanceId,
+
NodeConfig.Type nodeType) {
+ try {
+ if (controllerUrl == null || controllerUrl.isEmpty()) {
+ LOGGER.warn("Controller URL is empty, cannot fetch query workload
configs for instance: {}", instanceId);
+ return Collections.emptyMap();
+ }
+ URI queryWorkloadURI = new URI(controllerUrl +
"/queryWorkloadConfigs/instance/" + instanceId + "?nodeType="
+ + nodeType);
+ ClassicHttpRequest request = ClassicRequestBuilder.get(queryWorkloadURI)
+ .setVersion(HttpVersion.HTTP_1_1)
+ .setHeader(HttpHeaders.CONTENT_TYPE,
HttpClient.JSON_CONTENT_TYPE)
+ .build();
+ AtomicReference<Map<String, InstanceCost>> workloadToInstanceCost = new
AtomicReference<>(null);
+ RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(3,
3000L, 1.2f);
+ retryPolicy.attempt(() -> {
+ try {
+ SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
+ HTTP_CLIENT.sendRequest(request,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS)
+ );
+ if (response.getStatusCode() == HttpStatus.SC_OK) {
+
workloadToInstanceCost.set(JsonUtils.stringToObject(response.getResponse(), new
TypeReference<>() { }));
+ LOGGER.info("Successfully fetched query workload configs from
controller: {}, Instance: {}",
+ controllerUrl, instanceId);
+ return true;
+ } else if (response.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+ LOGGER.info("No query workload configs found for controller: {},
Instance: {}", controllerUrl, instanceId);
+ workloadToInstanceCost.set(Collections.emptyMap());
+ return true;
+ } else {
+ LOGGER.warn("Failed to fetch query workload configs from
controller: {}, Instance: {}, Status: {}",
+ controllerUrl, instanceId, response.getStatusCode());
+ return false;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to fetch query workload configs from controller:
{}, Instance: {}",
+ controllerUrl, instanceId, e);
+ return false;
+ }
+ });
+ return workloadToInstanceCost.get();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to fetch query workload configs from controller: {},
Instance: {}",
+ controllerUrl, instanceId, e);
+ return Collections.emptyMap();
+ }
+ }
+
+ /**
+ * Validates the given QueryWorkloadConfig and returns a list of validation
error messages.
+ *
+ * @param config the QueryWorkloadConfig to validate
+ * @return a list of validation errors; empty if config is valid
+ */
+ public static List<String> validateQueryWorkloadConfig(QueryWorkloadConfig
config) {
+ List<String> errors = new ArrayList<>();
+ if (config == null) {
+ errors.add("QueryWorkloadConfig cannot be null");
+ return errors;
+ }
+ String name = config.getQueryWorkloadName();
+ if (name == null || name.trim().isEmpty()) {
+ errors.add("queryWorkloadName cannot be null or empty");
+ }
+ List<NodeConfig> nodeConfigs = config.getNodeConfigs();
+ if (nodeConfigs == null || nodeConfigs.isEmpty()) {
+ errors.add("nodeConfigs cannot be null or empty");
+ } else {
+ for (int i = 0; i < nodeConfigs.size(); i++) {
+ NodeConfig nodeConfig = nodeConfigs.get(i);
+ String prefix = "nodeConfigs[" + i + "]";
+ if (nodeConfig == null) {
+ errors.add(prefix + " cannot be null");
+ continue;
+ }
+ if (nodeConfig.getNodeType() == null) {
+ errors.add(prefix + ".type cannot be null");
+ }
+ // Validate EnforcementProfile
+ EnforcementProfile enforcementProfile =
nodeConfig.getEnforcementProfile();
+ if (enforcementProfile == null) {
+ errors.add(prefix + "enforcementProfile cannot be null");
+ } else {
+ if (enforcementProfile.getCpuCostNs() < 0) {
+ errors.add(prefix + ".enforcementProfile.cpuCostNs cannot be
negative");
+ }
+ if (enforcementProfile.getMemoryCostBytes() < 0) {
+ errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot
be negative");
+ }
+ }
+ // Validate PropagationScheme
+ PropagationScheme propagationScheme =
nodeConfig.getPropagationScheme();
+ if (propagationScheme == null) {
+ errors.add(prefix + ".propagationScheme cannot be null");
+ } else {
+ if (propagationScheme.getPropagationType() == null) {
+ errors.add(prefix + ".propagationScheme.type cannot be null");
+ }
+ }
+ }
+ }
+ return errors;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java
new file mode 100644
index 0000000000..0bbe84b7fe
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtilsTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.config;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class QueryWorkloadConfigUtilsTest {
+
+ @Test(dataProvider = "fromZNRecordDataProvider")
+ public void testFromZNRecord(ZNRecord znRecord, QueryWorkloadConfig
expectedQueryWorkloadConfig, boolean shouldFail) {
+ try {
+ QueryWorkloadConfig actualQueryWorkloadConfig =
QueryWorkloadConfigUtils.fromZNRecord(znRecord);
+ if (shouldFail) {
+ Assert.fail("Expected an exception but none was thrown");
+ }
+ Assert.assertEquals(actualQueryWorkloadConfig,
expectedQueryWorkloadConfig);
+ } catch (Exception e) {
+ if (!shouldFail) {
+ Assert.fail("Caught unexpected exception: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @DataProvider(name = "fromZNRecordDataProvider")
+ public Object[][] fromZNRecordDataProvider() throws JsonProcessingException {
+ List<Object[]> data = new ArrayList<>();
+
+ // Shared, valid configuration
+ EnforcementProfile validEnforcementProfile = new EnforcementProfile(100,
100);
+
+ // Server node
+ PropagationScheme serverPropagationScheme = new
PropagationScheme(PropagationScheme.Type.TABLE,
+ List.of("value1", "value2"));
+ NodeConfig serverNodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE,
validEnforcementProfile,
+ serverPropagationScheme);
+
+ // Broker node
+ PropagationScheme brokerPropagationScheme = new
PropagationScheme(PropagationScheme.Type.TENANT,
+ List.of("value3", "value4"));
+ NodeConfig brokerNodeConfig = new NodeConfig(NodeConfig.Type.BROKER_NODE,
validEnforcementProfile,
+ brokerPropagationScheme);
+
+ List<NodeConfig> nodeConfigs = List.of(serverNodeConfig, brokerNodeConfig);
+ QueryWorkloadConfig validQueryWorkloadConfig = new
QueryWorkloadConfig("workloadId", nodeConfigs);
+
+ // Valid scenario: NODE_CONFIGS field is a JSON array string
+ ZNRecord validZnRecord = new ZNRecord("workloadId");
+ validZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+ validZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
JsonUtils.objectToString(nodeConfigs));
+ data.add(new Object[] { validZnRecord, validQueryWorkloadConfig, false });
+
+ // Null propagation scheme
+ NodeConfig nodeConfigWithoutPropagationScheme = new
NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile,
+ null);
+ List<NodeConfig> nodeConfigsWithoutPropagation =
List.of(nodeConfigWithoutPropagationScheme);
+ ZNRecord znRecordNullPropagation = new ZNRecord("workloadId");
+
znRecordNullPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+ znRecordNullPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
+ JsonUtils.objectToString(nodeConfigsWithoutPropagation));
+ QueryWorkloadConfig expectedQueryWorkloadConfigNullPropagation = new
QueryWorkloadConfig("workloadId",
+ nodeConfigsWithoutPropagation);
+ data.add(new Object[] { znRecordNullPropagation,
expectedQueryWorkloadConfigNullPropagation, false });
+
+ // Missing NODE_CONFIGS field
+ ZNRecord missingNodeConfigsZnRecord = new ZNRecord("workloadId");
+
missingNodeConfigsZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+ data.add(new Object[] { missingNodeConfigsZnRecord, null, true });
+
+ // Invalid JSON in NODE_CONFIGS field
+ ZNRecord invalidJsonZnRecord = new ZNRecord("workloadId");
+
invalidJsonZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+ invalidJsonZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
"{invalidJsonField: }");
+ data.add(new Object[] { invalidJsonZnRecord, null, true });
+
+ return data.toArray(new Object[0][]);
+ }
+
+ @Test(dataProvider = "updateZNRecordDataProvider")
+ public void testUpdateZNRecordWithWorkloadConfig(QueryWorkloadConfig
queryWorkloadConfig, ZNRecord znRecord,
+ ZNRecord expectedZnRecord, boolean shouldFail) {
+ try {
+ QueryWorkloadConfigUtils.updateZNRecordWithWorkloadConfig(znRecord,
queryWorkloadConfig);
+ if (shouldFail) {
+ Assert.fail("Expected an exception but none was thrown");
+ }
+ Assert.assertEquals(znRecord, expectedZnRecord);
+ } catch (Exception e) {
+ if (!shouldFail) {
+ Assert.fail("Caught unexpected exception: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @DataProvider(name = "updateZNRecordDataProvider")
+ public Object[][] updateZNRecordDataProvider() throws
JsonProcessingException {
+ List<Object[]> data = new ArrayList<>();
+
+ EnforcementProfile validEnforcementProfile = new EnforcementProfile(100,
100);
+ // Server scheme
+ PropagationScheme serverPropagationScheme = new
PropagationScheme(PropagationScheme.Type.TABLE,
+ List.of("value1", "value2"));
+ NodeConfig serverNodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE,
validEnforcementProfile,
+ serverPropagationScheme);
+ // Broker scheme
+ PropagationScheme brokerPropagationScheme = new
PropagationScheme(PropagationScheme.Type.TENANT,
+ List.of("value3", "value4"));
+ NodeConfig brokerNodeConfig = new NodeConfig(NodeConfig.Type.BROKER_NODE,
validEnforcementProfile,
+ brokerPropagationScheme);
+ List<NodeConfig> nodeConfigs = List.of(serverNodeConfig, brokerNodeConfig);
+ QueryWorkloadConfig validQueryWorkloadConfig = new
QueryWorkloadConfig("workloadId", nodeConfigs);
+
+ // 1) Valid scenario
+ ZNRecord validZnRecord = new ZNRecord("validId");
+ ZNRecord expectedValidZnRecord = new ZNRecord("validId");
+ validZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+
expectedValidZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+ String nodeConfigsJson = JsonUtils.objectToString(nodeConfigs);
+ validZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
nodeConfigsJson);
+ expectedValidZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
nodeConfigsJson);
+ data.add(new Object[] { validQueryWorkloadConfig, validZnRecord,
expectedValidZnRecord, false });
+
+ // 2) Null propagation scheme in both nodes
+ NodeConfig nodeConfigWithoutPropagation = new
NodeConfig(NodeConfig.Type.SERVER_NODE, validEnforcementProfile,
+ null);
+ List<NodeConfig> nodeConfigsWithoutPropagation =
List.of(nodeConfigWithoutPropagation);
+ QueryWorkloadConfig configWithoutPropagation = new
QueryWorkloadConfig("noPropagation",
+ nodeConfigsWithoutPropagation);
+
+ String nodeConfigsNoPropagationJson =
JsonUtils.objectToString(nodeConfigsWithoutPropagation);
+
+ ZNRecord znRecordNoPropagation = new ZNRecord("noPropagationId");
+ ZNRecord expectedZnRecordNoPropagation = new ZNRecord("noPropagationId");
+
znRecordNoPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"noPropagation");
+ znRecordNoPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
nodeConfigsNoPropagationJson);
+
+
expectedZnRecordNoPropagation.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"noPropagation");
+
expectedZnRecordNoPropagation.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
nodeConfigsNoPropagationJson);
+ data.add(new Object[] { configWithoutPropagation, znRecordNoPropagation,
expectedZnRecordNoPropagation, false });
+
+ // 3) Null server node in QueryWorkloadConfig
+ List<NodeConfig> nodeConfigsWithNullServerNode = List.of(brokerNodeConfig);
+ QueryWorkloadConfig nullServerNodeConfig = new
QueryWorkloadConfig("nullServer", nodeConfigsWithNullServerNode);
+ ZNRecord znRecordNullServer = new ZNRecord("nullServerId");
+ ZNRecord expectedZnRecordNullServer = new ZNRecord("nullServerId");
+ znRecordNullServer.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"nullServer");
+
expectedZnRecordNullServer.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"nullServer");
+ String nodeConfigsWithNullServerJson =
JsonUtils.objectToString(nodeConfigsWithNullServerNode);
+ znRecordNullServer.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
nodeConfigsWithNullServerJson);
+
expectedZnRecordNullServer.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
nodeConfigsWithNullServerJson);
+ data.add(new Object[] { nullServerNodeConfig, znRecordNullServer,
expectedZnRecordNullServer, false });
+
+ // 4) Null QueryWorkloadConfig -> should fail
+ ZNRecord znRecordNullConfig = new ZNRecord("nullConfigId");
+ data.add(new Object[] { null, znRecordNullConfig, null, true });
+
+ // 5) Null ZNRecord -> should fail
+ data.add(new Object[] { validQueryWorkloadConfig, null, null, true });
+
+ // 6) Behavior with empty ZNRecord ID
+ ZNRecord emptyIdZnRecord = new ZNRecord("");
+ ZNRecord expectedEmptyIdZnRecord = new ZNRecord("");
+ emptyIdZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+
expectedEmptyIdZnRecord.setSimpleField(QueryWorkloadConfig.QUERY_WORKLOAD_NAME,
"workloadId");
+ String emptyNodeConfigsJson = JsonUtils.objectToString(nodeConfigs);
+ emptyIdZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
emptyNodeConfigsJson);
+ expectedEmptyIdZnRecord.setSimpleField(QueryWorkloadConfig.NODE_CONFIGS,
emptyNodeConfigsJson);
+ data.add(new Object[] { validQueryWorkloadConfig, emptyIdZnRecord,
expectedEmptyIdZnRecord, false });
+
+ return data.toArray(new Object[0][]);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index fea05fc8b2..06305deb4c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -53,6 +53,7 @@ public class Constants {
public static final String APP_CONFIGS = "AppConfigs";
public static final String PERIODIC_TASK_TAG = "PeriodicTask";
public static final String UPSERT_RESOURCE_TAG = "Upsert";
+ public static final String QUERY_WORKLOAD_TAG = "QueryWorkload";
public static final String REALTIME_SEGMENT_VALIDATION_MANAGER =
"RealtimeSegmentValidationManager";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java
new file mode 100644
index 0000000000..35bdf6f9a2
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.utils.config.QueryWorkloadConfigUtils;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+@Api(tags = Constants.QUERY_WORKLOAD_TAG, authorizations =
{@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
+ @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
+ SWAGGER_AUTHORIZATION_KEY, description =
+ "The format of the key is ```\"Basic <token>\" or \"Bearer "
+ + "<token>\"```"), @ApiKeyAuthDefinition(name =
CommonConstants.QUERY_WORKLOAD, in =
+ ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
CommonConstants.QUERY_WORKLOAD, description =
+ "Workload context passed through http header. If no context is provided
'default' workload "
+ + "context will be considered.")
+}))
+@Path("/")
+public class PinotQueryWorkloadRestletResource {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(PinotQueryWorkloadRestletResource.class);
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/queryWorkloadConfigs")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_QUERY_WORKLOAD_CONFIG)
+ @Authenticate(AccessType.READ)
+ @ApiOperation(value = "Get all query workload configs", notes = "Get all
workload configs")
+ public String getQueryWorkloadConfigs(@Context HttpHeaders httpHeaders) {
+ try {
+ LOGGER.info("Received request to get all queryWorkloadConfigs");
+ List<QueryWorkloadConfig> queryWorkloadConfigs =
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
+ if (queryWorkloadConfigs.isEmpty()) {
+ return JsonUtils.objectToString(Map.of());
+ }
+ String response = JsonUtils.objectToString(queryWorkloadConfigs);
+ LOGGER.info("Successfully fetched all queryWorkloadConfigs");
+ return response;
+ } catch (Exception e) {
+ String errorMessage = String.format("Error while getting all workload
configs, error: %s", e);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * API to fetch query workload config
+ * @param queryWorkloadName Name of the query workload
+ * Example request:
+ * /queryWorkloadConfigs/workload-foo1
+ * Example response:
+ * {
+ * "queryWorkloadName" : "workload-foo1",
+ * "nodeConfigs" : {
+ * {
+ * "nodeType" : "brokerNode",
+ * "enforcementProfile": {
+ * "cpuCostNs": 500,
+ * "memoryCostBytes": 1000
+ * },
+ * "propagationScheme": {
+ * "propagationType": "TABLE",
+ * "values": ["airlineStats"]
+ * }
+ * },
+ * {
+ * "nodeType" : "serverNode",
+ * "enforcementProfile": {
+ * "cpuCostNs": 1500,
+ * "memoryCostBytes": 12000
+ * },
+ * "propagationScheme": {
+ * "propagationType": "TENANT",
+ * "values": ["DefaultTenant"]
+ * }
+ * }
+ * }
+ * }
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/queryWorkloadConfigs/{queryWorkloadName}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_QUERY_WORKLOAD_CONFIG)
+ @Authenticate(AccessType.READ)
+ @ApiOperation(value = "Get query workload config", notes = "Get workload
configs for the workload name")
+ public String getQueryWorkloadConfig(@PathParam("queryWorkloadName") String
queryWorkloadName,
+ @Context HttpHeaders httpHeaders) {
+ try {
+ LOGGER.info("Received request to get workload config for workload: {}",
queryWorkloadName);
+ QueryWorkloadConfig queryWorkloadConfig =
_pinotHelixResourceManager.getQueryWorkloadConfig(queryWorkloadName);
+ if (queryWorkloadConfig == null) {
+ throw new ControllerApplicationException(LOGGER, "Workload config not
found for workload: " + queryWorkloadName,
+ Response.Status.NOT_FOUND, null);
+ }
+ String response = queryWorkloadConfig.toJsonString();
+ LOGGER.info("Successfully fetched workload config for workload: {}",
queryWorkloadName);
+ return response;
+ } catch (Exception e) {
+ if (e instanceof ControllerApplicationException) {
+ throw (ControllerApplicationException) e;
+ } else {
+ String errorMessage = String.format("Error while getting workload
config for workload: %s, error: %s",
+ queryWorkloadName, e);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+ }
+
+
+ /**
+ * API to get all workload configs associated with the instance
+ * @param instanceName Helix instance name
+ * @return Map of workload name to instance cost
+ * Example request:
+ * /queryWorkloadConfigs/instance/Server_localhost_1234
+ * Example response:
+ * {
+ * "workload1": {
+ * "cpuCostNs": 100,
+ * "memoryCostBytes":100
+ * },
+ * "workload2": {
+ * "cpuCostNs": 50,
+ * "memoryCostBytes": 50
+ * }
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/queryWorkloadConfigs/instance/{instanceName}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_INSTANCE_QUERY_WORKLOAD_CONFIG)
+ @Authenticate(AccessType.READ)
+ @ApiOperation(value = "Get all workload configs associated with the
instance",
+ notes = "Get all workload configs associated with the instance")
+ public String getQueryWorkloadConfigForInstance(@PathParam("instanceName")
String instanceName,
+ @Context HttpHeaders
httpHeaders) {
+ try {
+ Map<String, InstanceCost> workloadToInstanceCostMap =
_pinotHelixResourceManager.getQueryWorkloadManager()
+ .getWorkloadToInstanceCostFor(instanceName);
+ if (workloadToInstanceCostMap == null ||
workloadToInstanceCostMap.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "No workload configs
found for instance: " + instanceName,
+ Response.Status.NOT_FOUND, null);
+ }
+ return JsonUtils.objectToString(workloadToInstanceCostMap);
+ } catch (Exception e) {
+ if (e instanceof ControllerApplicationException) {
+ throw (ControllerApplicationException) e;
+ } else {
+ String errorMessage = String.format("Error while getting workload
config for instance: %s, error: %s",
+ instanceName, e);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+ }
+
+ /**
+ * Updates the query workload config
+ * @param requestString JSON string representing the QueryWorkloadConfig
+ * Example request:
+ * {
+ * "queryWorkloadName" : "workload-foo1",
+ * "nodeConfigs" : {
+ * {
+ * "nodeType" : "brokerNode",
+ * "enforcementProfile": {
+ * "cpuCostNs": 500,
+ * "memoryCostBytes": 1000
+ * },
+ * "propagationScheme": {
+ * "propagationType": "TABLE",
+ * "values": ["airlineStats"]
+ * }
+ * },
+ * {
+ * "nodeType" : "serverNode",
+ * "enforcementProfile": {
+ * "cpuCostNs": 1500,
+ * "memoryCostBytes": 12000
+ * },
+ * "propagationScheme": {
+ * "propagationType": "TENANT",
+ * "values": ["DefaultTenant"]
+ * }
+ * }
+ * }
+ * }
+ *
+ */
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/queryWorkloadConfigs")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_QUERY_WORKLOAD_CONFIG)
+ @Authenticate(AccessType.UPDATE)
+ @ApiOperation(value = "Update query workload config", notes = "Update
workload config for the workload name")
+ public Response updateQueryWorkloadConfig(String requestString, @Context
HttpHeaders httpHeaders) {
+ try {
+ LOGGER.info("Received request to update queryWorkloadConfig with
request: {}", requestString);
+ QueryWorkloadConfig queryWorkloadConfig =
JsonUtils.stringToObject(requestString, QueryWorkloadConfig.class);
+ List<String> validationErrors =
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);
+ if (!validationErrors.isEmpty()) {
+ String errorMessage = String.format("Invalid query workload config:
%s", validationErrors);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
Response.Status.BAD_REQUEST, null);
+ }
+ _pinotHelixResourceManager.setQueryWorkloadConfig(queryWorkloadConfig);
+ String successMessage = String.format("Query Workload config updated
successfully for workload: %s",
+ queryWorkloadConfig.getQueryWorkloadName());
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (Exception e) {
+ String errorMessage = String.format("Error when updating query workload
request: %s, error: %s",
+ requestString, e);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Deletes the query workload config
+ * @param queryWorkloadName Name of the query workload to be deleted
+ * Example request:
+ * /queryWorkloadConfigs/workload-foo1
+ */
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/queryWorkloadConfigs/{queryWorkloadName}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DELETE_QUERY_WORKLOAD_CONFIG)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Delete query workload config", notes = "Delete
workload config for the workload name")
+ public Response deleteQueryWorkloadConfig(@PathParam("queryWorkloadName")
String queryWorkloadName,
+ @Context HttpHeaders httpHeaders) {
+ try {
+ _pinotHelixResourceManager.deleteQueryWorkloadConfig(queryWorkloadName);
+ String successMessage = String.format("Query Workload config deleted
successfully for workload: %s",
+ queryWorkloadName);
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (Exception e) {
+ String errorMessage = String.format("Error when deleting query workload
for workload: %s, error: %s",
+ queryWorkloadName, e);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * API to refresh propagation for a single query workload config
+ * This API doesn't update the config, it only triggers the propagation of
an existing workload config
+ *
+ * @param queryWorkloadName Name of the query workload to refresh
+ * Example request:
+ * POST /queryWorkloadConfigs/{queryWorkloadName}/refresh
+ */
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/queryWorkloadConfigs/{queryWorkloadName}/refresh")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_QUERY_WORKLOAD_CONFIG)
+ @Authenticate(AccessType.UPDATE)
+ @ApiOperation(value = "Refresh query workload config propagation", notes =
"Force propagation of an existing config")
+ public Response refreshQueryWorkloadConfig(@PathParam("queryWorkloadName")
String queryWorkloadName,
+ @Context HttpHeaders httpHeaders)
{
+ try {
+ LOGGER.info("Received request to refresh workload config propagation for
workload: {}", queryWorkloadName);
+ // Fetch existing config
+ QueryWorkloadConfig existingConfig =
_pinotHelixResourceManager.getQueryWorkloadConfig(queryWorkloadName);
+ if (existingConfig == null) {
+ throw new ControllerApplicationException(LOGGER, "Workload config not
found for workload: " + queryWorkloadName,
+ Response.Status.NOT_FOUND, null);
+ }
+
_pinotHelixResourceManager.getQueryWorkloadManager().propagateWorkloadUpdateMessage(existingConfig);
+ String successMessage = String.format("Query workload config propagation
triggered for workload: %s",
+ queryWorkloadName);
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (Exception e) {
+ if (e instanceof ControllerApplicationException) {
+ throw (ControllerApplicationException) e;
+ } else {
+ String errorMessage = String.format("Error when refreshing query
workload config for workload: %s, error: %s",
+ queryWorkloadName, e);
+ throw new ControllerApplicationException(LOGGER, errorMessage,
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a07caac5f1..5589bce4be 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -105,6 +105,7 @@ import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -155,6 +156,7 @@ import
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.workload.QueryWorkloadManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
@@ -170,6 +172,7 @@ import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.user.ComponentType;
import org.apache.pinot.spi.config.user.RoleType;
import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -237,6 +240,7 @@ public class PinotHelixResourceManager {
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private TableCache _tableCache;
private final LineageManager _lineageManager;
+ private final QueryWorkloadManager _queryWorkloadManager;
public PinotHelixResourceManager(String zkURL, String helixClusterName,
@Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int
deletedSegmentsRetentionInDays,
@@ -263,6 +267,7 @@ public class PinotHelixResourceManager {
_lineageUpdaterLocks[i] = new Object();
}
_lineageManager = lineageManager;
+ _queryWorkloadManager = new QueryWorkloadManager(this);
}
public PinotHelixResourceManager(ControllerConf controllerConf) {
@@ -544,6 +549,11 @@ public class PinotHelixResourceManager {
.filter(instance ->
InstanceTypeUtils.isMinion(instance.getId())).collect(Collectors.toList());
}
+ public List<String> getAllServerInstances() {
+ return HelixHelper.getAllInstances(_helixAdmin,
_helixClusterName).stream().filter(InstanceTypeUtils::isServer)
+ .collect(Collectors.toList());
+ }
+
/**
* Get all instances with the given tag
*/
@@ -1831,7 +1841,7 @@ public class PinotHelixResourceManager {
.put(tableNameWithType,
SegmentAssignmentUtils.getInstanceStateMap(brokers,
BrokerResourceStateModel.ONLINE));
return is;
});
-
+ _queryWorkloadManager.propagateWorkloadFor(tableNameWithType);
LOGGER.info("Adding table {}: Successfully added table",
tableNameWithType);
}
@@ -2195,6 +2205,8 @@ public class PinotHelixResourceManager {
// Send update query quota message if quota is specified
sendTableConfigRefreshMessage(tableNameWithType);
+ // TODO: Propagate workload for tables if there is change is change
instance characteristics
+ _queryWorkloadManager.propagateWorkloadFor(tableNameWithType);
}
public void deleteUser(String username) {
@@ -4788,6 +4800,53 @@ public class PinotHelixResourceManager {
return tagMinInstanceMap;
}
+ public List<QueryWorkloadConfig> getAllQueryWorkloadConfigs() {
+ return ZKMetadataProvider.getAllQueryWorkloadConfigs(_propertyStore);
+ }
+
+ @Nullable
+ public QueryWorkloadConfig getQueryWorkloadConfig(String queryWorkloadName) {
+ return ZKMetadataProvider.getQueryWorkloadConfig(_propertyStore,
queryWorkloadName);
+ }
+
+ public void setQueryWorkloadConfig(QueryWorkloadConfig queryWorkloadConfig) {
+ if (!ZKMetadataProvider.setQueryWorkloadConfig(_propertyStore,
queryWorkloadConfig)) {
+ throw new RuntimeException("Failed to set workload config for
queryWorkloadName: "
+ + queryWorkloadConfig.getQueryWorkloadName());
+ }
+ _queryWorkloadManager.propagateWorkloadUpdateMessage(queryWorkloadConfig);
+ }
+
+ public void sendQueryWorkloadRefreshMessage(Map<String,
QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) {
+ instanceToRefreshMessageMap.forEach((instance, message) -> {
+ Criteria criteria = new Criteria();
+ criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ criteria.setInstanceName(instance);
+ criteria.setSessionSpecific(true);
+
+ int numMessagesSent =
_helixZkManager.getMessagingService().send(criteria, message, null, -1);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} query workload config refresh messages to
instance: {}", numMessagesSent, instance);
+ } else {
+ LOGGER.warn("No query workload config refresh message sent to
instance: {}", instance);
+ }
+ });
+ }
+
+ public void deleteQueryWorkloadConfig(String workload) {
+ QueryWorkloadConfig queryWorkloadConfig = getQueryWorkloadConfig(workload);
+ if (queryWorkloadConfig == null) {
+ LOGGER.warn("Query workload config for {} does not exist, skipping
deletion", workload);
+ return;
+ }
+ _queryWorkloadManager.propagateDeleteWorkloadMessage(queryWorkloadConfig);
+ ZKMetadataProvider.deleteQueryWorkloadConfig(_propertyStore, workload);
+ }
+
+ public QueryWorkloadManager getQueryWorkloadManager() {
+ return _queryWorkloadManager;
+ }
+
/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java
new file mode 100644
index 0000000000..c093b98755
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.scheme.PropagationScheme;
+import org.apache.pinot.controller.workload.scheme.PropagationSchemeProvider;
+import org.apache.pinot.controller.workload.scheme.PropagationUtils;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
+import org.apache.pinot.controller.workload.splitter.DefaultCostSplitter;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The QueryWorkloadManager is responsible for managing the query workload
configuration and propagating/computing
+ * the cost to be enforced by relevant instances based on the propagation
scheme.
+ */
+public class QueryWorkloadManager {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(QueryWorkloadManager.class);
+
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final PropagationSchemeProvider _propagationSchemeProvider;
+ private final CostSplitter _costSplitter;
+
+ public QueryWorkloadManager(PinotHelixResourceManager
pinotHelixResourceManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _propagationSchemeProvider = new
PropagationSchemeProvider(pinotHelixResourceManager);
+ // TODO: To make this configurable once we have multiple cost splitters
implementations
+ _costSplitter = new DefaultCostSplitter();
+ }
+
+ /**
+ * Propagate the workload to the relevant instances based on the
PropagationScheme
+ * @param queryWorkloadConfig The query workload configuration to propagate
+ * 1. Resolve the instances based on the node type and propagation scheme
+ * 2. Calculate the instance cost for each instance
+ * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+ */
+ public void propagateWorkloadUpdateMessage(QueryWorkloadConfig
queryWorkloadConfig) {
+ String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+ for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+ // Resolve the instances based on the node type and propagation scheme
+ Set<String> instances = resolveInstances(nodeConfig);
+ if (instances.isEmpty()) {
+ String errorMsg = String.format("No instances found for Workload: %s",
queryWorkloadName);
+ LOGGER.warn(errorMsg);
+ continue;
+ }
+ Map<String, InstanceCost> instanceCostMap =
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
+ Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap =
instanceCostMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> new
QueryWorkloadRefreshMessage(queryWorkloadName,
+ QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE,
entry.getValue())));
+ // Send the QueryWorkloadRefreshMessage to the instances
+
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+ }
+ }
+
+ /**
+ * Propagate delete workload refresh message for the given
queryWorkloadConfig
+ * @param queryWorkloadConfig The query workload configuration to delete
+ * 1. Resolve the instances based on the node type and propagation scheme
+ * 2. Send the {@link QueryWorkloadRefreshMessage} with
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+ */
+ public void propagateDeleteWorkloadMessage(QueryWorkloadConfig
queryWorkloadConfig) {
+ String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+ for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+ Set<String> instances = resolveInstances(nodeConfig);
+ if (instances.isEmpty()) {
+ String errorMsg = String.format("No instances found for Workload: %s",
queryWorkloadName);
+ LOGGER.warn(errorMsg);
+ continue;
+ }
+ Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap =
instances.stream()
+ .collect(Collectors.toMap(instance -> instance, instance -> new
QueryWorkloadRefreshMessage(queryWorkloadName,
+ QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE,
null)));
+
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+ }
+ }
+
+ /**
+ * Propagate the workload for the given table name, it does fast exits if
queryWorkloadConfigs is empty
+ * @param tableName The table name to propagate the workload for, it can be
a rawTableName or a tableNameWithType
+ * if rawTableName is provided, it will resolve all available tableTypes and
propagate the workload for each tableType
+ *
+ * This method performs the following steps:
+ * 1. Find all the helix tags associated with the table
+ * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
+ * 3. Propagate the workload cost for instances associated with the workloads
+ */
+ public void propagateWorkloadFor(String tableName) {
+ try {
+ List<QueryWorkloadConfig> queryWorkloadConfigs =
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
+ if (queryWorkloadConfigs.isEmpty()) {
+ return;
+ }
+ // Get the helixTags associated with the table
+ List<String> helixTags =
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName);
+ // Find all workloads associated with the helix tags
+ Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
+
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager,
helixTags, queryWorkloadConfigs);
+ // Propagate the workload for each QueryWorkloadConfig
+ for (QueryWorkloadConfig queryWorkloadConfig :
queryWorkloadConfigsForTags) {
+ propagateWorkloadUpdateMessage(queryWorkloadConfig);
+ }
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to propagate workload for table:
%s", tableName);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ /**
+ * Get all the workload costs associated with the given instance and node
type
+ * 1. Find all the helix tags associated with the instance
+ * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
+ * 3. Find the instance associated with the {@link QueryWorkloadConfig} and
node type
+ *
+ * @param instanceName The instance name to get the workload costs for
+ * @return A map of workload name to {@link InstanceCost} for the given
instance and node type
+ */
+ public Map<String, InstanceCost> getWorkloadToInstanceCostFor(String
instanceName) {
+ try {
+ Map<String, InstanceCost> workloadToInstanceCostMap = new HashMap<>();
+ List<QueryWorkloadConfig> queryWorkloadConfigs =
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
+ if (queryWorkloadConfigs.isEmpty()) {
+ LOGGER.warn("No query workload configs found in zookeeper");
+ return workloadToInstanceCostMap;
+ }
+ // Find all the helix tags associated with the instance
+ InstanceConfig instanceConfig =
_pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
+ if (instanceConfig == null) {
+ LOGGER.warn("Instance config not found for instance: {}",
instanceName);
+ return workloadToInstanceCostMap;
+ }
+ NodeConfig.Type nodeType;
+ if (InstanceTypeUtils.isServer(instanceName)) {
+ nodeType = NodeConfig.Type.SERVER_NODE;
+ } else if (InstanceTypeUtils.isBroker(instanceName)) {
+ nodeType = NodeConfig.Type.BROKER_NODE;
+ } else {
+ LOGGER.warn("Unsupported instance type: {}, cannot compute workload
costs", instanceName);
+ return workloadToInstanceCostMap;
+ }
+
+ // Find all workloads associated with the helix tags
+ Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
+
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager,
instanceConfig.getTags(),
+ queryWorkloadConfigs);
+ // Calculate the instance cost from each workload
+ for (QueryWorkloadConfig queryWorkloadConfig :
queryWorkloadConfigsForTags) {
+ for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
+ if (nodeConfig.getNodeType() == nodeType) {
+ Set<String> instances = resolveInstances(nodeConfig);
+ InstanceCost instanceCost =
_costSplitter.computeInstanceCost(nodeConfig, instances, instanceName);
+ if (instanceCost != null) {
+
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(),
instanceCost);
+ }
+ break;
+ }
+ }
+ }
+ return workloadToInstanceCostMap;
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to get workload to instance cost
map for instance: %s", instanceName);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ private Set<String> resolveInstances(NodeConfig nodeConfig) {
+ PropagationScheme propagationScheme =
+
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
+ return propagationScheme.resolveInstances(nodeConfig);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java
new file mode 100644
index 0000000000..e9a373733b
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+
+public class DefaultPropagationScheme implements PropagationScheme {
+
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+ public DefaultPropagationScheme(PinotHelixResourceManager
pinotHelixResourceManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ }
+
+ @Override
+ public Set<String> resolveInstances(NodeConfig nodeConfig) {
+ Set<String> instances;
+ NodeConfig.Type nodeType = nodeConfig.getNodeType();
+ switch (nodeType) {
+ case BROKER_NODE:
+ instances = new
HashSet<>(_pinotHelixResourceManager.getAllBrokerInstances());
+ break;
+ case SERVER_NODE:
+ instances = new
HashSet<>(_pinotHelixResourceManager.getAllServerInstances());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid node type: " + nodeType);
+ }
+ return instances;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java
new file mode 100644
index 0000000000..d73602d6ec
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.Set;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+/**
+ * PropagationScheme is used to resolve instances based on the {@link
NodeConfig}
+ * 1. It helps to identify which instances to propagate the workload to based
on the node configuration
+ * 2. It helps among which instances the {@link
org.apache.pinot.spi.config.workload.EnforcementProfile} should be split
+ */
+public interface PropagationScheme {
+ /**
+ * Resolve the instances based on the node type and node configuration
+ * @param nodeConfig The {@link NodeConfig} to resolve the instances
+ * @return The set of instances to propagate the workload
+ */
+ Set<String> resolveInstances(NodeConfig nodeConfig);
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java
new file mode 100644
index 0000000000..d3e2eea167
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationSchemeProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+
+public class PropagationSchemeProvider {
+
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+ public PropagationSchemeProvider(PinotHelixResourceManager
pinotHelixResourceManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ }
+
+ public PropagationScheme getPropagationScheme(
+ org.apache.pinot.spi.config.workload.PropagationScheme.Type
schemeType) {
+ switch (schemeType) {
+ case TABLE:
+ return new TablePropagationScheme(_pinotHelixResourceManager);
+ case TENANT:
+ return new TenantPropagationScheme(_pinotHelixResourceManager);
+ default:
+ return new DefaultPropagationScheme(_pinotHelixResourceManager);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java
new file mode 100644
index 0000000000..fa720e9e73
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * This class provides utility methods for workload propagation.
+ */
+public class PropagationUtils {
+
+ private PropagationUtils() {
+ }
+
+ /**
+ * Get the mapping tableNameWithType → {BROKER_NODE→brokerTag,
SERVER_NODE→(serverTag + overrides)}
+ * 1. Get all table configs from the PinotHelixResourceManager
+ * 2. For each table config, extract the tenant config
+ * 3. For each tenant config, get the broker and server tags
+ * 4. Populate the helix tags for BROKER_NODE and SERVER_NODE separately
+ */
+ public static Map<String, Map<NodeConfig.Type, Set<String>>>
getTableToHelixTags(
+ PinotHelixResourceManager pinotResourceManager) {
+ Map<String, Map<NodeConfig.Type, Set<String>>> tableToTags = new
HashMap<>();
+ for (TableConfig tableConfig : pinotResourceManager.getAllTableConfigs()) {
+ TenantConfig tenantConfig = tableConfig.getTenantConfig();
+ TableType tableType = tableConfig.getTableType();
+
+ // Gather all relevant tags for this tenant
+ List<String> tenantTags = new ArrayList<>();
+ collectHelixTagsForTable(tenantTags, tenantConfig, tableType);
+
+ // Populate the helix tags for BROKER_NODE and SERVER_NODE separately to
provide flexibility
+ // in workload propagation to direct the workload to only specific node
types
+ String brokerTag =
TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
+ Set<String> brokerTags = Collections.singleton(brokerTag);
+
+ Set<String> serverTags = new HashSet<>(tenantTags);
+ serverTags.remove(brokerTag);
+
+ Map<NodeConfig.Type, Set<String>> nodeTypeToTags = new
EnumMap<>(NodeConfig.Type.class);
+ nodeTypeToTags.put(NodeConfig.Type.BROKER_NODE, brokerTags);
+ nodeTypeToTags.put(NodeConfig.Type.SERVER_NODE, serverTags);
+
+ tableToTags.put(tableConfig.getTableName(), nodeTypeToTags);
+ }
+ return tableToTags;
+ }
+
+ private static void collectHelixTagsForTable(List<String> tags, TenantConfig
tenantConfig, TableType tableType) {
+ tags.add(TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker()));
+ if (tableType == TableType.OFFLINE) {
+ tags.add(TagNameUtils.getOfflineTagForTenant(tenantConfig.getServer()));
+ } else {
+ // Returns the realtime tag if completed server tag is not set
+ String completedServerTag =
TagNameUtils.extractCompletedServerTag(tenantConfig);
+ // Returns the realtime tag if consuming server tag is not set
+ String consumingServerTag =
TagNameUtils.extractConsumingServerTag(tenantConfig);
+ if (completedServerTag.equals(consumingServerTag)) {
+ tags.add(completedServerTag);
+ } else {
+ tags.add(consumingServerTag);
+ tags.add(completedServerTag);
+ }
+ }
+ }
+
+ /**
+ * Get the helix tags for a given table name.
+ * If the table name does not have a type suffix, it will return both
offline and realtime tags.
+ */
+ public static List<String> getHelixTagsForTable(PinotHelixResourceManager
pinotResourceManager, String tableName) {
+ List<String> combinedTags = new ArrayList<>();
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ List<String> tablesWithType = (tableType == null)
+ ? Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
+ TableNameBuilder.REALTIME.tableNameWithType(tableName))
+ : Collections.singletonList(tableName);
+ for (String table : tablesWithType) {
+ TableConfig tableConfig = pinotResourceManager.getTableConfig(table);
+ if (tableConfig != null) {
+ collectHelixTagsForTable(combinedTags, tableConfig.getTenantConfig(),
tableConfig.getTableType());
+ }
+ }
+ return combinedTags;
+ }
+
+ /**
+ * Get the mapping between helix tag -> instances
+ */
+ public static Map<String, Set<String>>
getHelixTagToInstances(PinotHelixResourceManager pinotResourceManager) {
+ Map<String, Set<String>> tagToInstances = new HashMap<>();
+ for (InstanceConfig instanceConfig :
pinotResourceManager.getAllHelixInstanceConfigs()) {
+ String instanceName = instanceConfig.getInstanceName();
+ for (String helixTag : instanceConfig.getTags()) {
+ tagToInstances.computeIfAbsent(helixTag, tag -> new
HashSet<>()).add(instanceName);
+ }
+ }
+ return tagToInstances;
+ }
+
+ /**
+ * Returns the set of {@link QueryWorkloadConfig}s that match any of the
given Helix tags.
+ *
+ * This method filters the provided list of QueryWorkloadConfigs based on
whether their propagation
+ * targets intersect with the specified `filterTags`. The matching is
performed based on the
+ * propagation type defined for each node in the config:
+ *
+ * - For {@code TENANT} propagation:
+ * 1. Each value in the propagation scheme is treated as a tenant name or
direct Helix tag.
+ * 2. If the value is a recognized Helix tag (broker/server), it is used
directly.
+ * 3. Otherwise, the value is resolved to possible broker and server tags
for the tenant.
+ * 4. If any resolved tag matches one of the `filterTags`, the config is
included.
+ *
+ * - For {@code TABLE} propagation:
+ * 1. Table names are expanded to include type-suffixed forms (OFFLINE
and/or REALTIME).
+ * 2. These table names are mapped to corresponding Helix tags using node
type.
+ * 3. If any of the mapped tags intersect with `filterTags`, the config is
included.
+ *
+ * @param pinotHelixResourceManager The resource manager used to look up
table and instance metadata.
+ * @param filterTags The set of Helix tags used for filtering configs.
+ * @param queryWorkloadConfigs The full list of workload configs to evaluate.
+ * @return A set of workload configs whose propagation targets intersect
with the filterTags.
+ */
+ public static Set<QueryWorkloadConfig> getQueryWorkloadConfigsForTags(
+ PinotHelixResourceManager pinotHelixResourceManager, List<String>
filterTags,
+ List<QueryWorkloadConfig> queryWorkloadConfigs) {
+ Set<QueryWorkloadConfig> matchedConfigs = new HashSet<>();
+ Map<String, Map<NodeConfig.Type, Set<String>>> tableToHelixTags =
getTableToHelixTags(pinotHelixResourceManager);
+
+ for (QueryWorkloadConfig queryWorkloadConfig : queryWorkloadConfigs) {
+ for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
+ PropagationScheme scheme = nodeConfig.getPropagationScheme();
+
+ if (scheme.getPropagationType() == PropagationScheme.Type.TENANT) {
+ for (String tenant : scheme.getValues()) {
+ Set<String> resolvedTags = TagNameUtils.isOfflineServerTag(tenant)
+ || TagNameUtils.isRealtimeServerTag(tenant) ||
TagNameUtils.isBrokerTag(tenant)
+ ? Collections.singleton(tenant)
+ : new HashSet<>(getAllPossibleHelixTagsFor(tenant));
+ if (!Collections.disjoint(resolvedTags, filterTags)) {
+ matchedConfigs.add(queryWorkloadConfig);
+ break;
+ }
+ }
+ } else if (scheme.getPropagationType() ==
PropagationScheme.Type.TABLE) {
+ for (String tableName : scheme.getValues()) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ List<String> tablesWithType = (tableType == null)
+ ?
Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
+ TableNameBuilder.REALTIME.tableNameWithType(tableName))
+ : Collections.singletonList(tableName);
+ for (String tableWithType : tablesWithType) {
+ Set<String> resolvedTags = tableToHelixTags
+ .getOrDefault(tableWithType, Collections.emptyMap())
+ .getOrDefault(nodeConfig.getNodeType(),
Collections.emptySet());
+ if (!Collections.disjoint(resolvedTags, filterTags)) {
+ matchedConfigs.add(queryWorkloadConfig);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ return matchedConfigs;
+ }
+
+ private static List<String> getAllPossibleHelixTagsFor(String tenantName) {
+ List<String> helixTags = new ArrayList<>();
+ helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName));
+ helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName));
+ helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName));
+ return helixTags;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java
new file mode 100644
index 0000000000..c463eb87db
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+/**
+ * TablePropagationScheme is used to resolve instances based on the {@link
NodeConfig} and {@link NodeConfig.Type}
+ * It resolves the instances based on the table names specified in the node
configuration
+ */
+public class TablePropagationScheme implements PropagationScheme {
+
+ private static PinotHelixResourceManager _pinotHelixResourceManager;
+
+ public TablePropagationScheme(PinotHelixResourceManager
pinotHelixResourceManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ }
+
+ @Override
+ public Set<String> resolveInstances(NodeConfig nodeConfig) {
+ Set<String> instances = new HashSet<>();
+ List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
+ Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
+ = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
+ Map<String, Set<String>> helixTagToInstances
+ =
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+ for (String tableName : tableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ List<String> tablesWithType = new ArrayList<>();
+ if (tableType == null) {
+ // Get both offline and realtime table names if type is not present.
+
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+ } else {
+ tablesWithType.add(tableName);
+ }
+ for (String tableWithType : tablesWithType) {
+ Map<NodeConfig.Type, Set<String>> nodeToHelixTags =
tableWithTypeToHelixTags.get(tableWithType);
+ if (nodeToHelixTags != null) {
+ Set<String> helixTags =
nodeToHelixTags.get(nodeConfig.getNodeType());
+ if (helixTags != null) {
+ for (String helixTag : helixTags) {
+ Set<String> helixInstances = helixTagToInstances.get(helixTag);
+ if (helixInstances != null) {
+ instances.addAll(helixInstances);
+ }
+ }
+ }
+ }
+ }
+ }
+ return instances;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java
new file mode 100644
index 0000000000..4564b0c282
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TenantPropagationScheme.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.scheme;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+
+/**
+ * TenantPropagationScheme is used to resolve instances based on the {@link
NodeConfig} and {@link NodeConfig.Type}
+ * It resolves the instances based on the tenants specified in the node
configuration
+ */
+public class TenantPropagationScheme implements PropagationScheme {
+
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+ public TenantPropagationScheme(PinotHelixResourceManager
pinotHelixResourceManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ }
+
+ @Override
+ public Set<String> resolveInstances(NodeConfig nodeConfig) {
+ Map<String, Set<String>> helixTagToInstances
+ =
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+ Set<String> allInstances = new HashSet<>();
+ List<String> tenantNames = nodeConfig.getPropagationScheme().getValues();
+ NodeConfig.Type nodeType = nodeConfig.getNodeType();
+ // Get the unique set of helix tags for the tenants
+ Set<String> helixTags = new HashSet<>();
+ for (String tenantName : tenantNames) {
+ if (nodeType == NodeConfig.Type.BROKER_NODE) {
+ helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName));
+ } else if (nodeType == NodeConfig.Type.SERVER_NODE) {
+ if (TagNameUtils.isOfflineServerTag(tenantName) ||
TagNameUtils.isRealtimeServerTag(tenantName)) {
+ helixTags.add(tenantName);
+ } else {
+ helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName));
+ helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName));
+ }
+ }
+ }
+ // Get the instances for the helix tags
+ for (String helixTag : helixTags) {
+ Set<String> instances = helixTagToInstances.get(helixTag);
+ if (instances != null) {
+ allInstances.addAll(instances);
+ }
+ }
+ return allInstances;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java
new file mode 100644
index 0000000000..9f7f070fb1
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/CostSplitter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.splitter;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+/**
+ * Interface for splitting the cost of a workload between instances.
+ */
+public interface CostSplitter {
+ /**
+ * Computes the cost for each instance in the given set of instances.
+ *
+ * @param nodeConfig the node configuration
+ * @param instances names of all instances involved
+ * @return a map from instance identifier to the cost for that instance
+ */
+ Map<String, InstanceCost> computeInstanceCostMap(NodeConfig nodeConfig,
Set<String> instances);
+
+ /**
+ * Computes the cost for a specific instance.
+ *
+ * @param nodeConfig the node configuration
+ * @param instances names of all instances involved
+ * @param instance the instance identifier for which to compute the cost
+ * @return the cost for the specified instance
+ */
+ InstanceCost computeInstanceCost(NodeConfig nodeConfig, Set<String>
instances, String instance);
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java
new file mode 100644
index 0000000000..9e40015a0b
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/workload/splitter/DefaultCostSplitter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload.splitter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+
+
+public class DefaultCostSplitter implements CostSplitter {
+
+ @Override
+ public Map<String, InstanceCost> computeInstanceCostMap(NodeConfig
nodeConfig, Set<String> instances) {
+ InstanceCost cost = computeInstanceCost(nodeConfig, instances, null);
+ Map<String, InstanceCost> costMap = new HashMap<>();
+ for (String instance : instances) {
+ costMap.put(instance, cost);
+ }
+ return costMap;
+ }
+
+ @Override
+ public InstanceCost computeInstanceCost(NodeConfig nodeConfig, Set<String>
instances, String instance) {
+ long totalInstances = instances.size();
+ EnforcementProfile enforcementProfile = nodeConfig.getEnforcementProfile();
+
+ long cpuCostNs = enforcementProfile.getCpuCostNs() / totalInstances;
+ long memoryCostBytes = enforcementProfile.getMemoryCostBytes() /
totalInstances;
+
+ return new InstanceCost(cpuCostNs, memoryCostBytes);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java
new file mode 100644
index 0000000000..f487e68225
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/workload/PropagationUtilsTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.workload;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.scheme.PropagationUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PropagationUtilsTest {
+
+ private PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @BeforeClass
+ public void setUp() {
+ _pinotHelixResourceManager =
Mockito.mock(PinotHelixResourceManager.class);
+ }
+
+ @Test
+ public void getTableToHelixTagsTest() {
+ // Create a list of table configurations
+ List<TableConfig> tableConfigs = new ArrayList<>();
+ tableConfigs.add(createTableConfig("table1", "serverTag1",
"brokerTenant1", TableType.OFFLINE));
+ tableConfigs.add(createTableConfig("table2", "serverTag2",
"brokerTenant2", TableType.REALTIME));
+ // Mock the behavior of getAllTableConfigs to return the list of table
configurations
+
Mockito.when(_pinotHelixResourceManager.getAllTableConfigs()).thenReturn(tableConfigs);
+ // Call the method to get table to Helix tags
+ Map<String, Map<NodeConfig.Type, Set<String>>> tableToHelixTags
+ =
PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
+ // Verify the results
+ Map<String, Map<NodeConfig.Type, Set<String>>> expectedTags = new
HashMap<>();
+ expectedTags.put("table1_OFFLINE", new HashMap<>() {{
+ put(NodeConfig.Type.SERVER_NODE, Set.of("serverTag1_OFFLINE"));
+ put(NodeConfig.Type.BROKER_NODE, Set.of("brokerTenant1_BROKER"));
+ }});
+ expectedTags.put("table2_REALTIME", new HashMap<>() {{
+ put(NodeConfig.Type.SERVER_NODE, Set.of("serverTag2_REALTIME"));
+ put(NodeConfig.Type.BROKER_NODE, Set.of("brokerTenant2_BROKER"));
+ }});
+
+ Assert.assertEquals(tableToHelixTags.size(), expectedTags.size(),
+ "Expected size of table to helix tags mapping does not match");
+ for (Map.Entry<String, Map<NodeConfig.Type, Set<String>>> tableEntry :
expectedTags.entrySet()) {
+ String tableName = tableEntry.getKey();
+ Map<NodeConfig.Type, Set<String>> expectedNodeTags =
tableEntry.getValue();
+ // For each node type in the expected map, assert the tag exists
+ for (Map.Entry<NodeConfig.Type, Set<String>> entry :
expectedNodeTags.entrySet()) {
+ NodeConfig.Type nodeType = entry.getKey();
+ Set<String> expectedTag = entry.getValue();
+
Assert.assertEquals(tableToHelixTags.get(tableName).get(nodeType), expectedTag,
+ "Expected " + expectedTag + " for " + tableName + "
with node type " + nodeType
+ + " but found " +
tableToHelixTags.get(tableName).get(nodeType));
+ }
+ }
+ }
+
+ @Test
+ public void getHelixTagsForTableTest() {
+ // Mock the behavior of getHelixTagsForTable to return a set of helix
tags
+ TableConfig tableConfig = createTableConfig("table1", "serverTag1",
"brokerTenant1", TableType.OFFLINE);
+ TableConfig tableConfig2 = createTableConfig("table1", "serverTag2",
"brokerTenant2", TableType.REALTIME);
+
Mockito.when(_pinotHelixResourceManager.getTableConfig("table1_OFFLINE")).thenReturn(tableConfig);
+
Mockito.when(_pinotHelixResourceManager.getTableConfig("table1_REALTIME")).thenReturn(tableConfig2);
+
+ // Define the expected helix tags for the table
+ Map<String, Set<String>> expected = new HashMap<>();
+ expected.put("table1_OFFLINE", Set.of("serverTag1_OFFLINE",
"brokerTenant1_BROKER"));
+ expected.put("table1_REALTIME", Set.of("serverTag2_REALTIME",
"brokerTenant2_BROKER"));
+
+ for (Map.Entry<String, Set<String>> entry : expected.entrySet()) {
+ String tableName = entry.getKey();
+ Set<String> expectedHelixTags = entry.getValue();
+ // Call the method to get helix tags for the table
+ List<String> helixTags =
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager,
+ tableName);
+ // Verify the results
+ for (String helixTag : expectedHelixTags) {
+ Assert.assertTrue(helixTags.contains(helixTag),
+ "Expected helix tag " + helixTag + " for table " +
tableName + " but found " + helixTags);
+ }
+ }
+ }
+
+ @Test
+ public void getHelixTagToInstancesTest() {
+ // Create a list of instance configurations
+ List<InstanceConfig> instanceConfigs = List.of(
+ createInstanceConfig("instance1",
List.of("serverTag1_OFFLINE")),
+ createInstanceConfig("instance2",
List.of("serverTag1_OFFLINE")),
+ createInstanceConfig("instance3",
List.of("brokerTenant1_BROKER")),
+ createInstanceConfig("instance4",
List.of("brokerTenant1_BROKER"))
+ );
+
Mockito.when(_pinotHelixResourceManager.getAllHelixInstanceConfigs()).thenReturn(instanceConfigs);
+ // Call the method to get Helix tag to instances mapping
+ Map<String, Set<String>> helixTagToInstances
+ =
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+ // Verify the results
+ Map<String, Set<String>> expected = new HashMap<>();
+ expected.put("serverTag1_OFFLINE", Set.of("instance1", "instance2"));
+ expected.put("brokerTenant1_BROKER", Set.of("instance3", "instance4"));
+
+ Assert.assertEquals(helixTagToInstances.size(), expected.size(),
+ "Expected size of helix tag to instances mapping does not
match");
+ for (Map.Entry<String, Set<String>> entry : expected.entrySet()) {
+ String helixTag = entry.getKey();
+ Set<String> expectedInstances = entry.getValue();
+ Assert.assertTrue(helixTagToInstances.containsKey(helixTag),
+ "Expected helix tag " + helixTag + " but not found in the
mapping");
+ for (String instance : expectedInstances) {
+
Assert.assertTrue(helixTagToInstances.get(helixTag).contains(instance),
+ "Expected instance " + instance + " for helix tag " +
helixTag + " but found "
+ + helixTagToInstances.get(helixTag));
+ }
+ }
+ }
+
+ @Test
+ public void getQueryWorkloadConfigsForTagsTest() {
+ // Create a list of query workload configurations
+ QueryWorkloadConfig workloadConfig1 =
createQueryWorkloadConfig("workload1",
+ new PropagationScheme(PropagationScheme.Type.TABLE,
List.of("table1", "table2")),
+ new PropagationScheme(PropagationScheme.Type.TABLE,
List.of("table1", "table2")));
+ QueryWorkloadConfig workloadConfig2 =
createQueryWorkloadConfig("workload2",
+ new PropagationScheme(PropagationScheme.Type.TENANT,
List.of("serverTag1")),
+ new PropagationScheme(PropagationScheme.Type.TENANT,
List.of("brokerTenant1_BROKER")));
+ QueryWorkloadConfig workloadConfig3 =
createQueryWorkloadConfig("workload3",
+ new PropagationScheme(PropagationScheme.Type.TENANT,
List.of("serverTag2_REALTIME")),
+ new PropagationScheme(PropagationScheme.Type.TENANT,
List.of("brokerTenant2")));
+ List<QueryWorkloadConfig> queryWorkloadConfigs =
List.of(workloadConfig1, workloadConfig2, workloadConfig3);
+ // Create TableConfig for the workload
+ List<TableConfig> tableConfigs = List.of(
+ createTableConfig("table1", "serverTag1", "brokerTenant1",
TableType.OFFLINE),
+ createTableConfig("table2", "serverTag2", "brokerTenant2",
TableType.REALTIME)
+ );
+ // Mock the behavior of getAllTableConfigs to return the list of table
configurations
+
Mockito.when(_pinotHelixResourceManager.getAllTableConfigs()).thenReturn(tableConfigs);
+
+ List<String> helixTags = List.of("serverTag1_OFFLINE",
"brokerTenant1_BROKER",
+ "serverTag2_REALTIME", "brokerTenant2_BROKER");
+ Set<QueryWorkloadConfig> workloadConfigsForTags =
+
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager,
helixTags,
+ queryWorkloadConfigs);
+ // Verify the results
+ Set<QueryWorkloadConfig> expectedWorkloadConfigs =
Set.of(workloadConfig1, workloadConfig2, workloadConfig3);
+ Assert.assertEquals(workloadConfigsForTags.size(),
expectedWorkloadConfigs.size(),
+ "Expected size of workload configs for tags does not match");
+ for (QueryWorkloadConfig workloadConfig : workloadConfigsForTags) {
+ Assert.assertTrue(expectedWorkloadConfigs.contains(workloadConfig),
+ "Expected workload config " +
workloadConfig.getQueryWorkloadName()
+ + " but not found in the expected set");
+ }
+ }
+
+ private TableConfig createTableConfig(String tableName, String serverTag,
String brokerTenant, TableType type) {
+ return new TableConfigBuilder(type)
+ .setTableName(tableName)
+
.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy")
+ .setNumReplicas(1)
+ .setBrokerTenant(brokerTenant)
+ .setServerTenant(serverTag)
+ .setLoadMode("HEAP")
+ .setSegmentVersion(null)
+ .build();
+ }
+
+ private InstanceConfig createInstanceConfig(String instanceName,
List<String> helixTags) {
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ for (String helixTag : helixTags) {
+ instanceConfig.addTag(helixTag);
+ }
+ return instanceConfig;
+ }
+
+ private QueryWorkloadConfig createQueryWorkloadConfig(String name,
PropagationScheme serverScheme,
+ PropagationScheme
brokerScheme) {
+ EnforcementProfile enforcementProfile = new EnforcementProfile(10, 10);
+ return new QueryWorkloadConfig(name, List.of(
+ new NodeConfig(NodeConfig.Type.SERVER_NODE, enforcementProfile,
serverScheme),
+ new NodeConfig(NodeConfig.Type.BROKER_NODE, enforcementProfile,
brokerScheme)
+ ));
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 2fa066e991..6dec9b7903 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -99,6 +99,10 @@ public class Actions {
public static final String UPDATE_INSTANCE_PARTITIONS =
"UpdateInstancePartitions";
public static final String GET_RESPONSE_STORE = "GetResponseStore";
public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore";
+ public static final String GET_QUERY_WORKLOAD_CONFIG =
"GetQueryWorkloadConfig";
+ public static final String GET_INSTANCE_QUERY_WORKLOAD_CONFIG =
"GetInstanceQueryWorkloadConfig";
+ public static final String UPDATE_QUERY_WORKLOAD_CONFIG =
"UpdateQueryWorkloadConfig";
+ public static final String DELETE_QUERY_WORKLOAD_CONFIG =
"DeleteQueryWorkloadConfig";
public static final String GET_GROOVY_STATIC_ANALYZER_CONFIG =
"GetGroovyStaticAnalyzerConfig";
public static final String UPDATE_GROOVY_STATIC_ANALYZER_CONFIG =
"UpdateGroovyStaticAnalyzerConfig";
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 76d4a3a39b..adb8509090 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -721,6 +721,12 @@ public abstract class BaseServerStarter implements
ServiceStartable {
new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
messageHandlerFactory);
+ // Query workload message handler factory
+ QueryWorkloadMessageHandlerFactory queryWorkloadMessageHandlerFactory =
+ new QueryWorkloadMessageHandlerFactory(serverMetrics);
+ _helixManager.getMessagingService()
+
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+ queryWorkloadMessageHandlerFactory);
serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () ->
_helixManager.isConnected() ? 1L : 0L);
_helixManager.addPreConnectCallback(
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
new file mode 100644
index 0000000000..bdf91fc7ff
--- /dev/null
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/QueryWorkloadMessageHandlerFactory.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.QueryWorkloadRefreshMessage;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.workload.InstanceCost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryWorkloadMessageHandlerFactory implements
MessageHandlerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryWorkloadMessageHandlerFactory.class);
+ private final ServerMetrics _metrics;
+
+ public QueryWorkloadMessageHandlerFactory(ServerMetrics metrics) {
+ _metrics = metrics;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext
context) {
+ String messageType = message.getMsgSubType();
+ if
(messageType.equals(QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)
+ ||
messageType.equals(QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE))
{
+ return new QueryWorkloadRefreshMessageHandler(new
QueryWorkloadRefreshMessage(message), context);
+ } else {
+ throw new IllegalArgumentException("Unknown message subtype: " +
messageType);
+ }
+ }
+
+ // Gets called once during start up. We must return the same message type
that this factory is registered for.
+ @Override
+ public String getMessageType() {
+ return Message.MessageType.USER_DEFINE_MSG.toString();
+ }
+
+ @Override
+ public void reset() {
+ LOGGER.info("Reset called");
+ }
+
+ private static class QueryWorkloadRefreshMessageHandler extends
MessageHandler {
+ final String _queryWorkloadName;
+ final InstanceCost _instanceCost;
+
+ QueryWorkloadRefreshMessageHandler(QueryWorkloadRefreshMessage
queryWorkloadRefreshMessage,
+ NotificationContext context) {
+ super(queryWorkloadRefreshMessage, context);
+ _queryWorkloadName =
queryWorkloadRefreshMessage.getQueryWorkloadName();
+ _instanceCost = queryWorkloadRefreshMessage.getInstanceCost();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ // TODO: Add logic to invoke the query workload manager to
refresh/delete the query workload config
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode errorCode, ErrorType
errorType) {
+ LOGGER.error("Got error while refreshing query workload config for
query workload: {} (error code: {},"
+ + " error type: {})", _queryWorkloadName, errorCode,
errorType, e);
+ }
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java
new file mode 100644
index 0000000000..b520e6c4a5
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/EnforcementProfile.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Defines the resource enforcement profile for a node within a query workload.
+ * <p>
+ * This profile specifies the maximum CPU time (in nanoseconds) and maximum
memory (in bytes)
+ * that queries under this workload are allowed to consume on the node.
+ * </p>
+ *
+ * @see QueryWorkloadConfig
+ * @see NodeConfig
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class EnforcementProfile extends BaseJsonConfig {
+
+ private static final String CPU_COST_NS = "cpuCostNs";
+ private static final String MEMORY_COST_BYTES = "memoryCostBytes";
+
+ @JsonPropertyDescription("Max CPU cost allowed for the workload")
+ private long _cpuCostNs;
+ @JsonPropertyDescription("Max memory cost allowed for the workload")
+ private long _memoryCostBytes;
+
+ /**
+ * Constructs an EnforcementProfile with specified resource limits.
+ *
+ * @param cpuCostNs maximum allowed CPU cost in nanoseconds for the workload
+ * @param memoryCostBytes maximum allowed memory cost in bytes for the
workload
+ */
+ public EnforcementProfile(@JsonProperty(CPU_COST_NS) long cpuCostNs,
+ @JsonProperty(MEMORY_COST_BYTES) long
memoryCostBytes) {
+ _cpuCostNs = cpuCostNs;
+ _memoryCostBytes = memoryCostBytes;
+ }
+
+ /**
+ * Returns the maximum CPU cost allowed for this workload.
+ *
+ * @return CPU cost limit in nanoseconds
+ */
+ public long getCpuCostNs() {
+ return _cpuCostNs;
+ }
+
+ /**
+ * Returns the maximum memory cost allowed for this workload.
+ *
+ * @return memory cost limit in bytes
+ */
+ public long getMemoryCostBytes() {
+ return _memoryCostBytes;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java
new file mode 100644
index 0000000000..a7befaaed1
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/InstanceCost.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+
+/**
+ * Represents the resource cost profile of an instance for query workload
enforcement.
+ * <p>
+ * This class defines the CPU and memory cost thresholds that an instance can
incur
+ * when executing queries, used when propagating workload configurations to
individual instances.
+ * </p>
+ *
+ */
+public class InstanceCost {
+
+ private static final String CPU_COST_NS = "cpuCostNs";
+ private static final String MEMORY_COST_BYTES = "memoryCostBytes";
+
+ /**
+ * The CPU cost threshold for the instance, in nanoseconds.
+ */
+ @JsonPropertyDescription("CPU cost of the instance in nanoseconds")
+ private long _cpuCostNs;
+ /**
+ * The memory cost threshold for the instance, in bytes.
+ */
+ @JsonPropertyDescription("Memory cost of the instance in bytes")
+ private long _memoryCostBytes;
+
+ /**
+ * Constructs an InstanceCost profile with specified CPU and memory
thresholds.
+ *
+ * @param cpuCostNs CPU cost threshold in nanoseconds for this instance
+ * @param memoryCostBytes memory cost threshold in bytes for this instance
+ */
+ @JsonCreator
+ public InstanceCost(@JsonProperty(CPU_COST_NS) long cpuCostNs,
+ @JsonProperty(MEMORY_COST_BYTES) long memoryCostBytes) {
+ _cpuCostNs = cpuCostNs;
+ _memoryCostBytes = memoryCostBytes;
+ }
+
+ /**
+ * Returns the CPU cost threshold for this instance.
+ *
+ * @return CPU cost limit in nanoseconds
+ */
+ public long getCpuCostNs() {
+ return _cpuCostNs;
+ }
+
+ /**
+ * Returns the memory cost threshold for this instance.
+ *
+ * @return memory cost limit in bytes
+ */
+ public long getMemoryCostBytes() {
+ return _memoryCostBytes;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java
new file mode 100644
index 0000000000..25be645191
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/NodeConfig.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.annotation.JsonValue;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Represents the configuration for a specific node type in a query workload.
+ * <p>
+ * Each NodeConfig specifies:
+ * <ul>
+ * <li><strong>Node Type:</strong> The role of the node in processing
queries.</li>
+ * <li><strong>Enforcement Profile:</strong> Resource limits (CPU and
memory) applied to this node.</li>
+ * <li><strong>Propagation Scheme:</strong> Optional instructions for
cascading configs to downstream nodes.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * This class is used within {@link QueryWorkloadConfig} to define per-node
settings
+ * that tailor query execution behavior based on workload classification.
+ * </p>
+ *
+ * @see QueryWorkloadConfig
+ * @see EnforcementProfile
+ * @see PropagationScheme
+ */
+public class NodeConfig extends BaseJsonConfig {
+
+ public enum Type {
+ BROKER_NODE("brokerNode"),
+ SERVER_NODE("serverNode");
+
+ private final String _value;
+
+ Type(String jsonValue) {
+ _value = jsonValue;
+ }
+
+ @JsonValue
+ public String getJsonValue() {
+ return _value;
+ }
+
+ @JsonCreator
+ public static Type forValue(String value) {
+ if (value == null) {
+ return null;
+ }
+ // Normalize the input to lower case and trim spaces
+ String normalized = value.toLowerCase().trim();
+ for (Type type : Type.values()) {
+ if (type.getJsonValue().toLowerCase().equals(normalized)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("Invalid node type: " + value);
+ }
+ }
+
+ private static final String NODE_TYPE = "nodeType";
+ private static final String ENFORCEMENT_PROFILE = "enforcementProfile";
+ private static final String PROPAGATION_SCHEME = "propagationScheme";
+
+ /**
+ * The role of this node within the query workload, indicating whether it
directly serves
+ * queries or acts as an intermediate forwarding node.
+ */
+ @JsonPropertyDescription("Describes the type of node")
+ private Type _nodeType;
+
+ /**
+ * The resource enforcement profile for this node, defining limits on CPU
and memory
+ * usage for queries under this workload.
+ */
+ @JsonPropertyDescription("Describes the enforcement profile for the node")
+ private EnforcementProfile _enforcementProfile;
+
+ /**
+ * Optional propagation scheme that specifies how configuration settings are
cascaded
+ * or shared with downstream nodes; may be null if no propagation is applied.
+ */
+ @JsonPropertyDescription("Describes the propagation scheme for the node")
+ private PropagationScheme _propagationScheme;
+
+ @JsonCreator
+ public NodeConfig(
+ @JsonProperty(NODE_TYPE) Type nodeType,
+ @JsonProperty(ENFORCEMENT_PROFILE) EnforcementProfile enforcementProfile,
+ @JsonProperty(PROPAGATION_SCHEME) @Nullable PropagationScheme
propagationScheme) {
+ _nodeType = nodeType;
+ _enforcementProfile = enforcementProfile;
+ _propagationScheme = propagationScheme;
+ }
+
+ public Type getNodeType() {
+ return _nodeType;
+ }
+
+ public EnforcementProfile getEnforcementProfile() {
+ return _enforcementProfile;
+ }
+
+ public PropagationScheme getPropagationScheme() {
+ return _propagationScheme;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java
new file mode 100644
index 0000000000..ac68bbea68
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.List;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Defines how configuration settings are propagated across workloads.
+ * <p>
+ * A PropagationScheme determines the scope and specific values (e.g., tables
or tenants)
+ * to which workload settings should be applied. This allows selective
cascading
+ * of resource and query limits across different instances.
+ * </p>
+ *
+ * @see QueryWorkloadConfig
+ * @see NodeConfig
+ */
+public class PropagationScheme extends BaseJsonConfig {
+
+ /**
+ * Enumerates the propagation scheme types that control the scope of
propagation.
+ * <p>
+ * - TABLE: Propagate settings at the per-table level.<br>
+ * - TENANT: Propagate settings at the tenant (logical group) level.
+ * </p>
+ */
+ public enum Type {
+ /** Propagate workload settings to individual tables. */
+ TABLE("table"),
+ /** Propagate workload settings to all tables under a tenant. */
+ TENANT("tenant");
+
+ private final String _value;
+
+ Type(String value) {
+ _value = value;
+ }
+
+ /**
+ * Returns the JSON string representation of this propagation type.
+ *
+ * @return the JSON value corresponding to this Type (e.g., "table",
"tenant")
+ */
+ @JsonValue
+ public String getJsonValue() {
+ return _value;
+ }
+
+ /**
+ * Parses a JSON string into the corresponding Type enum.
+ * <p>
+ * Accepts case-insensitive and trimmed input matching defined JSON values.
+ * </p>
+ *
+ * @param value JSON string to parse (may be null)
+ * @return the matching Type enum, or null if input is null
+ * @throws IllegalArgumentException if the input does not match any Type
+ */
+ @JsonCreator
+ public static Type forValue(String value) {
+ if (value == null) {
+ return null;
+ }
+ String normalized = value.toLowerCase().trim();
+ for (Type type : Type.values()) {
+ if (type.getJsonValue().equals(normalized)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("Invalid propagation scheme type: " +
value);
+ }
+ }
+
+ private static final String PROPAGATION_TYPE = "propagationType";
+ private static final String VALUES = "values";
+
+ /**
+ * The type of propagation to apply (per-table or per-tenant).
+ */
+ @JsonPropertyDescription("Describes the type of propagation scheme")
+ private Type _propagationType;
+
+ /**
+ * The specific identifiers (table names or tenant names) to which settings
apply.
+ */
+ @JsonPropertyDescription("Describes the values of the propagation scheme")
+ private List<String> _values;
+
+ /**
+ * Constructs a PropagationScheme with the given type and target values.
+ *
+ * @param propagationType the Type of propagation (TABLE or TENANT)
+ * @param values the list of identifiers (tables or tenants) for propagation
+ */
+ @JsonCreator
+ public PropagationScheme(@JsonProperty(PROPAGATION_TYPE) Type
propagationType,
+ @JsonProperty(VALUES) List<String> values) {
+ _propagationType = propagationType;
+ _values = values;
+ }
+
+ /**
+ * Returns the configured propagation type.
+ *
+ * @return the Type enum indicating propagation scope
+ */
+ public Type getPropagationType() {
+ return _propagationType;
+ }
+
+ /**
+ * Returns the list of target identifiers for propagation.
+ *
+ * @return list of table names or tenant names
+ */
+ public List<String> getValues() {
+ return _values;
+ }
+
+ /**
+ * Sets the propagation type.
+ *
+ * @param propagationType new Type to define propagation scope
+ */
+ public void setPropagationType(Type propagationType) {
+ _propagationType = propagationType;
+ }
+
+ /**
+ * Sets the target identifiers for propagation.
+ *
+ * @param values list of table or tenant names to apply settings to
+ */
+ public void setValues(List<String> values) {
+ _values = values;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java
new file mode 100644
index 0000000000..477a7f5668
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/QueryWorkloadConfig.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.List;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+/**
+ * Represents the configuration for a named query workload in a Pinot Helix
cluster.
+ * <p>
+ * A QueryWorkload groups a set of nodes and associated configuration
parameters under a single workload name.
+ * Workloads are applied at the cluster level: individual queries specify the
workload they belong to by
+ * providing the workload name in their query options. This allows us to
manage and enforce isolation by limiting
+ * the resources available to queries based on their workload classification.
+ * </p>
+ * <p><strong>Example:</strong></p>
+ * <pre>{@code
+ * {
+ * "queryWorkloadName": "analytics",
+ * "nodeConfigs": [
+ * {
+ * "nodeType": "brokerNode",
+ * "enforcementProfile": {
+ * "cpuCostNs": 1000000,
+ * "memoryCostBytes": 10000000
+ * },
+ * "propagationScheme": {
+ * "propagationType": "TABLE",
+ * "values": ["airlineStats"]
+ * }
+ * },
+ * {
+ * "nodeType": "serverNode",
+ * "enforcementProfile": {
+ * "cpuCostNs": 2000000,
+ * "memoryCostBytes": 20000000
+ * },
+ * "propagationScheme": {
+ * "propagationType": "TENANT",
+ * "values": ["tenantA", "tenantB"]
+ * }
+ * }
+ * ]
+ * }
+ * }</pre>
+ *
+ * @see NodeConfig
+ */
+public class QueryWorkloadConfig extends BaseJsonConfig {
+
+ public static final String QUERY_WORKLOAD_NAME = "queryWorkloadName";
+ public static final String NODE_CONFIGS = "nodeConfigs";
+
+ @JsonPropertyDescription("Describes the name for the query workload, this
should be unique across the zk cluster")
+ private String _queryWorkloadName;
+
+ @JsonPropertyDescription("Describes the node configs for the query workload")
+ private List<NodeConfig> _nodeConfigs;
+
+ @JsonCreator
+ /**
+ * Constructs a new QueryWorkloadConfig instance.
+ *
+ * @param queryWorkloadName unique name identifying this workload across the
cluster
+ * @param nodeConfigs list of per-node configurations for this workload
+ */
+ public QueryWorkloadConfig(@JsonProperty(QUERY_WORKLOAD_NAME) String
queryWorkloadName,
+ @JsonProperty(NODE_CONFIGS) List<NodeConfig> nodeConfigs) {
+ _queryWorkloadName = queryWorkloadName;
+ _nodeConfigs = nodeConfigs;
+ }
+
+ /**
+ * Returns the unique name of this query workload.
+ *
+ * @return the workload name used by queries to specify this workload
+ */
+ public String getQueryWorkloadName() {
+ return _queryWorkloadName;
+ }
+
+ /**
+ * Returns the list of node-specific configurations for this workload.
+ *
+ * @return list of NodeConfig objects for this workload
+ */
+ public List<NodeConfig> getNodeConfigs() {
+ return _nodeConfigs;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index bd76f381c0..81af254f52 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -72,6 +72,7 @@ public class CommonConstants {
public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1";
public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME =
"pinot.tar.compression.codec.name";
+ public static final String QUERY_WORKLOAD = "queryWorkload";
public static final String JFR = "pinot.jfr";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]