vvivekiyer commented on code in PR #16672:
URL: https://github.com/apache/pinot/pull/16672#discussion_r2369292523


##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,78 +82,186 @@ public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager)
   }
 
   /**
-   * Propagate the workload to the relevant instances based on the 
PropagationScheme
-   * @param queryWorkloadConfig The query workload configuration to propagate
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Calculate the instance cost for each instance
-   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   * Propagates an upsert of a workload's cost configuration to all relevant 
instances.
+   *
+   * <p>
+   * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig}, 
this method:
+   * </p>
+   * <ol>
+   *   <li>Resolves the {@link PropagationScheme} from the node's configured 
scheme type.</li>
+   *   <li>Computes the per-instance {@link InstanceCost} map using the 
configured
+   *       {@link CostSplitter}.</li>
+   *   <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+   *       {@link 
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+   *       instance with its computed cost.</li>
+   * </ol>
+   *
+   * <p>
+   * This call is idempotent from the manager's perspective: the same inputs 
will result in the
+   * same set of messages being sent. Instances are expected to apply the new 
costs immediately.
+   * </p>
+   *
+   * <p>
+   *  This call is atomic to the extent possible: if any error occurs during 
estimating the target instances
+   *  and their cost. The entire propagation is aborted and no partial updates 
are sent to any instances.
+   * </p>
+   *
+   * <p>
+   *  We rely on Helix reliable messaging to ensure message delivery to 
instances.
+   *  However, if an instance is down during the propagation, it will miss the 
update however, we have logic
+   *  on the instance side to fetch the latest workload configs from 
controller during startup.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload definition (name, node types, 
budgets, and propagation
+   *                            scheme) to propagate.
    */
   public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      // Resolve the instances based on the node type and propagation scheme
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+    Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new 
HashMap<>();
+    try {
+      for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+        PropagationScheme propagationScheme = 
_propagationSchemeProvider.getPropagationScheme(
+            nodeConfig.getPropagationScheme().getPropagationType());
+        // For propagation entities with empty cpu or memory cost, distribute 
the remaining cost evenly among them
+        checkAndDistributeEmptyPropagationEntitiesEvenly(nodeConfig);

Review Comment:
   Based on the offline discussion, we discussed that we can remove support for 
this case. Let's add this to validation. It just makes the configs difficult to 
read. 



##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java:
##########
@@ -212,4 +212,11 @@ public static boolean 
shouldFetchPreConfiguredInstancePartitions(TableConfig tab
     return hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType)
         && 
!InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, 
instancePartitionsType);
   }
+
+  public static List<String> getAllPossibleInstancePartitionsName(String 
tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);

Review Comment:
   Can we use getInstancePartitionsName(tableName, type) instead? Looks like 
this does not need a function in InstancePartitionUtils. Repeating the name 
construction logic here has risks of this logic diverging if the name format 
changes. 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java:
##########
@@ -151,13 +152,12 @@ public BudgetStats tryCharge(String workload, long 
cpuUsedNs, long memoryUsedByt
   /**
    * Retrieves the remaining budget for a specific workload.
    */
-  public BudgetStats getRemainingBudgetForWorkload(String workload) {
+  public BudgetStats getBudgetStats(String workload) {

Review Comment:
   Can we keep the existing naming as it's more descriptive? The logic change 
inside looks good. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java:
##########
@@ -82,6 +118,65 @@ public static Map<String, Map<NodeConfig.Type, 
Set<String>>> getTableToHelixTags
     return tableToTags;
   }
 
+  /**
+   * Builds a mapping from instance partition names to the set of instances 
assigned to those partitions.
+   * <p>
+   * This method retrieves all instance partitions from the cluster and 
creates a mapping where each
+   * partition configuration key (instance partition name) maps to the set of 
instances that have been
+   * assigned to that partition.
+   *
+   *
+   * <p>Example return value:
+   * <pre>
+   * {
+   *   "airline_OFFLINE": {"Server_1", "Server_2", "Server_3"},
+   *   "events_CONSUMING": {"Server_6", "Server_7"},
+   *   "events_COMPLETED": {"Server_8", "Server_9", "Server_10"}
+   * }
+   * </pre>
+   *
+   * <p>This mapping is useful for workload propagation schemes that need to 
understand which instances
+   * are responsible for serving specific table, enabling fine-grained 
resource allocation
+   * and cost distribution across the cluster.
+   *
+   */
+  public static Map<String, Set<String>> getPartitionConfigKeyToInstances(
+      PinotHelixResourceManager pinotResourceManager) {
+    Map<String, Set<String>> partitionTypeToInstances = new HashMap<>();
+    List<InstancePartitions> instancePartitionsList = 
pinotResourceManager.getAllInstancePartitions();
+    if (instancePartitionsList == null) {
+      LOGGER.warn("No instance partitions found, returning empty mapping");

Review Comment:
   As mentioned in the other comment, this can be true for balanced. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,78 +82,186 @@ public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager)
   }
 
   /**
-   * Propagate the workload to the relevant instances based on the 
PropagationScheme
-   * @param queryWorkloadConfig The query workload configuration to propagate
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Calculate the instance cost for each instance
-   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   * Propagates an upsert of a workload's cost configuration to all relevant 
instances.
+   *
+   * <p>
+   * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig}, 
this method:
+   * </p>
+   * <ol>
+   *   <li>Resolves the {@link PropagationScheme} from the node's configured 
scheme type.</li>
+   *   <li>Computes the per-instance {@link InstanceCost} map using the 
configured
+   *       {@link CostSplitter}.</li>
+   *   <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+   *       {@link 
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+   *       instance with its computed cost.</li>
+   * </ol>
+   *
+   * <p>
+   * This call is idempotent from the manager's perspective: the same inputs 
will result in the
+   * same set of messages being sent. Instances are expected to apply the new 
costs immediately.
+   * </p>
+   *
+   * <p>
+   *  This call is atomic to the extent possible: if any error occurs during 
estimating the target instances
+   *  and their cost. The entire propagation is aborted and no partial updates 
are sent to any instances.
+   * </p>
+   *
+   * <p>
+   *  We rely on Helix reliable messaging to ensure message delivery to 
instances.
+   *  However, if an instance is down during the propagation, it will miss the 
update however, we have logic
+   *  on the instance side to fetch the latest workload configs from 
controller during startup.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload definition (name, node types, 
budgets, and propagation
+   *                            scheme) to propagate.
    */
   public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      // Resolve the instances based on the node type and propagation scheme
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+    Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new 
HashMap<>();
+    try {
+      for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+        PropagationScheme propagationScheme = 
_propagationSchemeProvider.getPropagationScheme(
+            nodeConfig.getPropagationScheme().getPropagationType());
+        // For propagation entities with empty cpu or memory cost, distribute 
the remaining cost evenly among them
+        checkAndDistributeEmptyPropagationEntitiesEvenly(nodeConfig);
+        Map<String, InstanceCost> instanceCostMap = 
propagationScheme.resolveInstanceCostMap(nodeConfig, _costSplitter);
+        if (instanceCostMap.isEmpty()) {
+          // This is to ensure that the configured entity is valid and maps to 
some instances
+          String errorMsg = String.format("No instances found for workload 
update: %s with nodeConfig: %s",
+              queryWorkloadName, nodeConfig);
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
+        }
+
+        Map<String, QueryWorkloadRefreshMessage> nodeToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+                
QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
+        instanceToRefreshMessageMap.putAll(nodeToRefreshMessageMap);
       }
-      Map<String, InstanceCost> instanceCostMap = 
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
-      // Send the QueryWorkloadRefreshMessage to the instances
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      // Sends the message only after all nodeConfigs are processed 
successfully
+      sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      LOGGER.info("Successfully propagated workload update for: {} to {} 
instances", queryWorkloadName,
+          instanceToRefreshMessageMap.size());
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to propagate workload update 
for: %s", queryWorkloadName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
     }
   }
 
   /**
-   * Propagate delete workload refresh message for the given 
queryWorkloadConfig
-   * @param queryWorkloadConfig The query workload configuration to delete
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Send the {@link QueryWorkloadRefreshMessage} with 
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+   * Propagates a delete for the given workload to all relevant instances.
+   *
+   * <p>
+   * The method resolves the target instances for each {@link NodeConfig} and 
sends a
+   * {@link QueryWorkloadRefreshMessage} with subtype
+   * {@link QueryWorkloadRefreshMessage#DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE},
+   * which instructs the instance to remove local state associated with the 
workload and stop enforcing costs for it.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload to delete (only the name and node 
scoping are used).
    */
   public void propagateDeleteWorkloadMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+    LOGGER.info("Propagating workload delete for: {}", queryWorkloadName);
+
     for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
+      if (nodeConfig == null) {
+        LOGGER.warn("Skipping null NodeConfig for workload delete: {}", 
queryWorkloadName);
         continue;
       }
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
-          .collect(Collectors.toMap(instance -> instance, instance -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
null)));
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      try {
+        Set<String> instances = resolveInstances(nodeConfig);
+        if (instances.isEmpty()) {
+          LOGGER.warn("No instances found for workload delete: {} with 
nodeConfig: {}", queryWorkloadName, nodeConfig);
+          continue;
+        }
+        QueryWorkloadRefreshMessage deleteMessage = new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+            QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
new InstanceCost(0, 0));
+        Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
+            .collect(Collectors.toMap(instance -> instance, instance -> 
deleteMessage));
+
+        // Send the QueryWorkloadRefreshMessage to the instances
+       sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+        LOGGER.info("Successfully propagated workload delete for: {} to {} 
instances", queryWorkloadName,
+            instances.size());
+      } catch (Exception e) {
+        String errorMsg = String.format("Failed to propagate workload delete 
for: %s with nodeConfig: %s",
+            queryWorkloadName, nodeConfig);
+        LOGGER.error(errorMsg, e);
+        throw new RuntimeException(errorMsg, e);
+      }
     }
   }
 
   /**
-   * Propagate the workload for the given table name, it does fast exits if 
queryWorkloadConfigs is empty
-   * @param tableName The table name to propagate the workload for, it can be 
a rawTableName or a tableNameWithType
-   * if rawTableName is provided, it will resolve all available tableTypes and 
propagate the workload for each tableType
-   *
-   * This method performs the following steps:
-   * 1. Find all the helix tags associated with the table
-   * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
-   * 3. Propagate the workload cost for instances associated with the workloads
+   * Propagates workload updates for all workloads that apply to the given 
table.
+   *
+   * <p>
+   * This helper performs the following:
+   * </p>
+   * <ol>
+   *   <li>Fetches all {@link QueryWorkloadConfig}s from Zookeeper.</li>
+   *   <li>Resolves the Helix tags associated with the table (supports raw 
table names and
+   *       type-qualified names).</li>
+   *   <li>Filters the workload configs to those whose scope matches the 
table's tags.</li>
+   *   <li>Invokes {@link 
#propagateWorkloadUpdateMessage(QueryWorkloadConfig)} for each match.</li>
+   * </ol>
+   *
+   * <p>
+   * If no workloads are configured, the method returns immediately. Any 
exception encountered is
+   * logged and rethrown as a {@link RuntimeException}.
+   * </p>
+   *
+   * @param tableName The raw or type-qualified table name (e.g., {@code 
myTable} or
+   *                  {@code myTable_OFFLINE}).
+   * @throws RuntimeException If propagation fails due to Helix/ZK access or 
message dispatch
+   *                          errors.
    */
   public void propagateWorkloadFor(String tableName) {
     try {
       List<QueryWorkloadConfig> queryWorkloadConfigs = 
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
       if (queryWorkloadConfigs.isEmpty()) {
-          return;
+        return;
       }
       // Get the helixTags associated with the table
       List<String> helixTags = 
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName);
+      if (helixTags.isEmpty()) {
+        LOGGER.warn("No Helix tags found for table: {}, skipping workload 
propagation", tableName);
+        return;
+      }
+
       // Find all workloads associated with the helix tags
       Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
           
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
helixTags, queryWorkloadConfigs);
+
+      if (queryWorkloadConfigsForTags.isEmpty()) {
+        LOGGER.info("No workload configs match table: {}, no propagation 
needed", tableName);
+        return;
+      }
+
       // Propagate the workload for each QueryWorkloadConfig
+      int successCount = 0;
       for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
-        propagateWorkloadUpdateMessage(queryWorkloadConfig);
+        try {
+          List<String> errors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);

Review Comment:
   Why is validation needed here during propagation? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java:
##########
@@ -19,60 +19,263 @@
 package org.apache.pinot.controller.workload.scheme;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationEntity;
+import org.apache.pinot.spi.config.workload.PropagationEntityOverrides;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 /**
- * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
- * It resolves the instances based on the table names specified in the node 
configuration
+ * A {@code TablePropagationScheme} resolves Pinot instances based on table 
names in a node
+ * configuration.
+ *
  */
 public class TablePropagationScheme implements PropagationScheme {
 
-  private static PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  @Override
+  /**
+   * Resolves the union of all instances across all cost splits for the given 
node config.
+   *
+   * Example:
+   * <pre>
+   *   { "Broker_Instance_1", "Broker_Instance_2", "Server_Instance_1" }
+   * </pre>
+   */
   public Set<String> resolveInstances(NodeConfig nodeConfig) {
     Set<String> instances = new HashSet<>();
-    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
-    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
-            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
-    Map<String, Set<String>> helixTagToInstances
-            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
-    for (String tableName : tableNames) {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-      List<String> tablesWithType = new ArrayList<>();
-      if (tableType == null) {
-        // Get both offline and realtime table names if type is not present.
-        
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
-        
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+    Map<String, Set<String>> partitionKeyToInstances =
+        
PropagationUtils.getPartitionConfigKeyToInstances(_pinotHelixResourceManager);

Review Comment:
   Is there a need to fetch all InstancePartitions here? Is it not enough to do 
a pointed fetch for the InstancePartitions that you care about based on the 
node configs? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java:
##########
@@ -33,4 +36,13 @@ public interface PropagationScheme {
    * @return The set of instances to propagate the workload
    */
   Set<String> resolveInstances(NodeConfig nodeConfig);
+
+  /**
+   * Computes the per-instance cost map for the given node config using the 
provided splitter.
+   *
+   * @param nodeConfig Node configuration containing cost splits and scope.
+   * @param costSplitter Strategy used to compute costs per instance.
+   * @return A mapping of instance name to its computed {@link InstanceCost}.
+   */
+  Map<String, InstanceCost> resolveInstanceCostMap(NodeConfig nodeConfig, 
CostSplitter costSplitter);

Review Comment:
   PropagationScheme is responsible for resolving instances for a workload. 
Adding a required function in this interface that computes the cost of each 
resolved instance seems counter-intuitive. We already have 
computeInstanceCostMap in the CostSplitter interface. So we can just use that 
directly in QueryWorkloadManager (as it has the _costSplitter member).



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -4738,33 +4735,29 @@ public List<QueryWorkloadConfig> 
getAllQueryWorkloadConfigs() {
     return ZKMetadataProvider.getAllQueryWorkloadConfigs(_propertyStore);
   }
 
+  public List<InstancePartitions> getAllInstancePartitions() {

Review Comment:
   Is this util function needed? This doesn't seem to be doing anything other 
than calling ZkMetadataProvider.getAllInstancePartitions().  



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java:
##########
@@ -19,60 +19,263 @@
 package org.apache.pinot.controller.workload.scheme;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationEntity;
+import org.apache.pinot.spi.config.workload.PropagationEntityOverrides;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 /**
- * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
- * It resolves the instances based on the table names specified in the node 
configuration
+ * A {@code TablePropagationScheme} resolves Pinot instances based on table 
names in a node
+ * configuration.
+ *
  */
 public class TablePropagationScheme implements PropagationScheme {
 
-  private static PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  @Override
+  /**
+   * Resolves the union of all instances across all cost splits for the given 
node config.
+   *
+   * Example:
+   * <pre>
+   *   { "Broker_Instance_1", "Broker_Instance_2", "Server_Instance_1" }
+   * </pre>
+   */
   public Set<String> resolveInstances(NodeConfig nodeConfig) {
     Set<String> instances = new HashSet<>();
-    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
-    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
-            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
-    Map<String, Set<String>> helixTagToInstances
-            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
-    for (String tableName : tableNames) {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-      List<String> tablesWithType = new ArrayList<>();
-      if (tableType == null) {
-        // Get both offline and realtime table names if type is not present.
-        
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
-        
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+    Map<String, Set<String>> partitionKeyToInstances =
+        
PropagationUtils.getPartitionConfigKeyToInstances(_pinotHelixResourceManager);
+    HelixManager helixManager = _pinotHelixResourceManager.getHelixZkManager();
+    ExternalView brokerResource = 
HelixHelper.getExternalViewForResource(helixManager.getClusterManagmentTool(),
+        helixManager.getClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    NodeConfig.Type nodeType = nodeConfig.getNodeType();
+
+    for (PropagationEntity entity : 
nodeConfig.getPropagationScheme().getPropagationEntities()) {
+      String tableName = entity.getEntity();
+      if (nodeConfig.getNodeType() == NodeConfig.Type.BROKER_NODE) {
+        for (String tableWithType : expandToTablesWithType(tableName)) {
+          instances.addAll(getBrokerInstances(brokerResource, tableWithType));
+        }
+      } else if (nodeType == NodeConfig.Type.SERVER_NODE) {
+        for (String partitionKey : 
InstancePartitionsUtils.getAllPossibleInstancePartitionsName(tableName)) {
+          instances.addAll(partitionKeyToInstances.getOrDefault(partitionKey, 
Collections.emptySet()));
+        }
+      } else {
+        throw new IllegalStateException("Unsupported node type: " + nodeType);
+      }
+    }
+    return instances;
+  }
+  /**
+   * Computes the per-instance cost map for the given node config using the 
provided splitter.
+   *
+   * <p>
+   * For each cost split in the node config, the relevant instances are 
resolved based on
+   * table names and node type. The splitter is then used to compute costs for 
those instances.
+   * If multiple splits resolve to the same instance, their costs are summed.
+   * </p>
+   *
+   * Example:
+   * <pre>
+   *   {
+   *     "Broker_Instance_1": { cpuCostNs: 1000000, memoryCostBytes: 1048576 },
+   *     "Broker_Instance_2": { cpuCostNs: 1000000, memoryCostBytes: 1048576 },
+   *     "Server_Instance_1": { cpuCostNs: 2000000, memoryCostBytes: 2097152 },
+   *   }
+   * </pre>
+   */
+  @Override
+  public Map<String, InstanceCost> resolveInstanceCostMap(NodeConfig 
nodeConfig, CostSplitter costSplitter) {
+    Map<String, InstanceCost> instanceCostMap = new HashMap<>();
+    // One-time zk lookups reused across all entities
+    HelixManager helixManager = _pinotHelixResourceManager.getHelixZkManager();
+    ExternalView brokerResource = 
HelixHelper.getExternalViewForResource(helixManager.getClusterManagmentTool(),
+        helixManager.getClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    Map<String, Set<String>> partitionKeyToInstances =
+        
PropagationUtils.getPartitionConfigKeyToInstances(_pinotHelixResourceManager);
+
+    NodeConfig.Type nodeType = nodeConfig.getNodeType();
+
+    for (PropagationEntity entity : 
nodeConfig.getPropagationScheme().getPropagationEntities()) {
+      String tableName = entity.getEntity();
+      Set<String> instances;
+      Map<String, InstanceCost> deltaCost;
+      if (nodeType == NodeConfig.Type.BROKER_NODE) {
+        instances = getBrokerInstances(brokerResource, tableName);

Review Comment:
   (nit) Some of this logic is repeated between resolveInstances and 
resolveInstanceCostMap. CAn you check if we can consolidate in a util? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -170,34 +312,182 @@ public Map<String, InstanceCost> 
getWorkloadToInstanceCostFor(String instanceNam
         return workloadToInstanceCostMap;
       }
 
-      // Find all workloads associated with the helix tags
+      // Find all helix tags for this instance
+      InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
+      if (instanceConfig == null) {
+        LOGGER.warn("Instance config not found for instance: {}", 
instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      List<String> instanceTags = instanceConfig.getTags();
+      if (instanceTags == null || instanceTags.isEmpty()) {
+        LOGGER.warn("No tags found for instance: {}, cannot compute workload 
costs", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // Filter workloads by the instance's tags
       Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
-          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceConfig.getTags(),
-                  queryWorkloadConfigs);
-      // Calculate the instance cost from each workload
+          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceTags,
+              queryWorkloadConfigs);
+
+      if (queryWorkloadConfigsForTags.isEmpty()) {
+        LOGGER.debug("No workload configs match instance: {}", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // For each workload, aggregate contributions across all applicable 
nodeConfigs and propagation entities
       for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
+        String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
         for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
-          if (nodeConfig.getNodeType() == nodeType) {
-            Set<String> instances = resolveInstances(nodeConfig);
-            InstanceCost instanceCost = 
_costSplitter.computeInstanceCost(nodeConfig, instances, instanceName);
-            if (instanceCost != null) {
-              
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+          try {
+            if (nodeConfig.getNodeType() == nodeType) {
+              List<String> errors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);
+              if (!errors.isEmpty()) {
+                LOGGER.error("Invalid QueryWorkloadConfig: {} for instance: 
{}, errors: {}", queryWorkloadConfig,
+                    instanceName, errors);
+                continue;
+              }
+              Map<String, InstanceCost> instanceCostMap =
+                  
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme()
+                          
.getPropagationType()).resolveInstanceCostMap(nodeConfig, _costSplitter);
+
+              InstanceCost instanceCost = instanceCostMap.get(instanceName);
+              if (instanceCost != null) {
+                workloadToInstanceCostMap.put(queryWorkloadName, instanceCost);
+                LOGGER.info("Found workload cost for instance: {} workload: {} 
cost: {}",
+                    instanceName, queryWorkloadName, instanceCost);
+              }
+              // There should be only one matching nodeConfig (BROKER_NODE or 
SERVER_NODE) within a workload
+              break;
             }
-            break;
+          } catch (Exception e) {
+            LOGGER.error("Failed to compute instance cost for instance: {} 
workload: {}",
+                instanceName, queryWorkloadName, e);
+            // Continue with other workloads instead of failing completely
           }
         }
       }
+      LOGGER.info("Computed {} workload costs for instance: {}", 
workloadToInstanceCostMap.size(), instanceName);
       return workloadToInstanceCostMap;
     } catch (Exception e) {
-      String errorMsg = String.format("Failed to get workload to instance cost 
map for instance: %s", instanceName);
+      String errorMsg = String.format("Failed to compute workload costs for 
instance: %s", instanceName);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg, e);
     }
   }
 
   private Set<String> resolveInstances(NodeConfig nodeConfig) {
     PropagationScheme propagationScheme =
-            
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
+        
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
     return propagationScheme.resolveInstances(nodeConfig);
   }
+
+  /**
+   * For propagation entities with empty cpu or memory cost, distribute the 
remaining cost evenly among them.
+   * If all entities have defined costs, we do nothing.
+   *
+   * @param nodeConfig The node config containing the propagation entities to 
check and distribute costs for.
+   */
+  private void checkAndDistributeEmptyPropagationEntitiesEvenly(NodeConfig 
nodeConfig) {
+    List<PropagationEntity> propagationEntities = 
nodeConfig.getPropagationScheme().getPropagationEntities();
+    int emptyEntitiesCount = 0;
+    int definedEntitiesCount = 0;
+    for (PropagationEntity entity : propagationEntities) {
+      if (entity.getCpuCostNs() == null || entity.getMemoryCostBytes() == 
null) {
+        emptyEntitiesCount++;
+      } else {
+        definedEntitiesCount++;
+      }
+    }
+    if (definedEntitiesCount > 0 && emptyEntitiesCount > 0) {
+      String errorMsg = String.format("Mixed defined and empty costs in 
propagation entities is not supported. "
+          + "NodeConfig: %s", nodeConfig);
+      LOGGER.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    if (emptyEntitiesCount == propagationEntities.size()) {
+      // All entities have empty costs - distribute total budget evenly
+      long totalCpuCostNs = nodeConfig.getEnforcementProfile().getCpuCostNs();
+      long totalMemoryCostBytes = 
nodeConfig.getEnforcementProfile().getMemoryCostBytes();
+
+      int numEntities = propagationEntities.size();
+      long shareCpuCostNs = totalCpuCostNs / numEntities;
+      long shareMemoryCostBytes = totalMemoryCostBytes / numEntities;
+
+      for (PropagationEntity entity : propagationEntities) {
+        entity.setCpuCostNs(shareCpuCostNs);
+        entity.setMemoryCostBytes(shareMemoryCostBytes);
+      }
+      LOGGER.info("Distributed costs evenly across {} entities: CPU={}ns, 
Memory={}bytes per entity",
+          numEntities, shareCpuCostNs, shareMemoryCostBytes);
+    }
+  }
+
+  /**
+   * Sends the given map of {@link QueryWorkloadRefreshMessage} to their 
respective instances asynchronously.
+   *
+   * <p>
+   * Each message is sent in its own asynchronous task, and the method waits 
for all tasks to complete
+   * with a timeout of 60 seconds per instance. Success and failure counts are 
logged.
+   * </p>
+   *
+   * @param instanceToRefreshMessageMap A map from instance name to the {@link 
QueryWorkloadRefreshMessage} to send.
+   */
+  public void sendQueryWorkloadRefreshMessage(Map<String, 
QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) {
+    ClusterMessagingService messagingService = 
_pinotHelixResourceManager.getHelixZkManager().getMessagingService();
+    List<CompletableFuture<Boolean>> futures = 
instanceToRefreshMessageMap.entrySet().stream()

Review Comment:
   You had a valid reason for handling the message sending logic this - latency 
for queuing each task. Can you please add a detailed comment here explaining 
why you decided to do it this way? 
   
   An alternative here is to rely on Server API calls to send this detail to 
the server. Please create a ticket for yourself and let's explore this as a 
part of the perf benchmarking. 
   
   



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,78 +82,186 @@ public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager)
   }
 
   /**
-   * Propagate the workload to the relevant instances based on the 
PropagationScheme
-   * @param queryWorkloadConfig The query workload configuration to propagate
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Calculate the instance cost for each instance
-   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   * Propagates an upsert of a workload's cost configuration to all relevant 
instances.
+   *
+   * <p>
+   * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig}, 
this method:
+   * </p>
+   * <ol>
+   *   <li>Resolves the {@link PropagationScheme} from the node's configured 
scheme type.</li>
+   *   <li>Computes the per-instance {@link InstanceCost} map using the 
configured
+   *       {@link CostSplitter}.</li>
+   *   <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+   *       {@link 
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+   *       instance with its computed cost.</li>
+   * </ol>
+   *
+   * <p>
+   * This call is idempotent from the manager's perspective: the same inputs 
will result in the
+   * same set of messages being sent. Instances are expected to apply the new 
costs immediately.
+   * </p>
+   *
+   * <p>
+   *  This call is atomic to the extent possible: if any error occurs during 
estimating the target instances
+   *  and their cost. The entire propagation is aborted and no partial updates 
are sent to any instances.
+   * </p>
+   *
+   * <p>
+   *  We rely on Helix reliable messaging to ensure message delivery to 
instances.
+   *  However, if an instance is down during the propagation, it will miss the 
update however, we have logic
+   *  on the instance side to fetch the latest workload configs from 
controller during startup.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload definition (name, node types, 
budgets, and propagation
+   *                            scheme) to propagate.
    */
   public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      // Resolve the instances based on the node type and propagation scheme
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+    Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new 
HashMap<>();
+    try {
+      for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+        PropagationScheme propagationScheme = 
_propagationSchemeProvider.getPropagationScheme(
+            nodeConfig.getPropagationScheme().getPropagationType());
+        // For propagation entities with empty cpu or memory cost, distribute 
the remaining cost evenly among them
+        checkAndDistributeEmptyPropagationEntitiesEvenly(nodeConfig);
+        Map<String, InstanceCost> instanceCostMap = 
propagationScheme.resolveInstanceCostMap(nodeConfig, _costSplitter);
+        if (instanceCostMap.isEmpty()) {
+          // This is to ensure that the configured entity is valid and maps to 
some instances
+          String errorMsg = String.format("No instances found for workload 
update: %s with nodeConfig: %s",
+              queryWorkloadName, nodeConfig);
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
+        }
+
+        Map<String, QueryWorkloadRefreshMessage> nodeToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+                
QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
+        instanceToRefreshMessageMap.putAll(nodeToRefreshMessageMap);
       }
-      Map<String, InstanceCost> instanceCostMap = 
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
-      // Send the QueryWorkloadRefreshMessage to the instances
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      // Sends the message only after all nodeConfigs are processed 
successfully
+      sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      LOGGER.info("Successfully propagated workload update for: {} to {} 
instances", queryWorkloadName,
+          instanceToRefreshMessageMap.size());
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to propagate workload update 
for: %s", queryWorkloadName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
     }
   }
 
   /**
-   * Propagate delete workload refresh message for the given 
queryWorkloadConfig
-   * @param queryWorkloadConfig The query workload configuration to delete
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Send the {@link QueryWorkloadRefreshMessage} with 
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+   * Propagates a delete for the given workload to all relevant instances.
+   *
+   * <p>
+   * The method resolves the target instances for each {@link NodeConfig} and 
sends a
+   * {@link QueryWorkloadRefreshMessage} with subtype
+   * {@link QueryWorkloadRefreshMessage#DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE},
+   * which instructs the instance to remove local state associated with the 
workload and stop enforcing costs for it.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload to delete (only the name and node 
scoping are used).
    */
   public void propagateDeleteWorkloadMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+    LOGGER.info("Propagating workload delete for: {}", queryWorkloadName);
+
     for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
+      if (nodeConfig == null) {
+        LOGGER.warn("Skipping null NodeConfig for workload delete: {}", 
queryWorkloadName);
         continue;
       }
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
-          .collect(Collectors.toMap(instance -> instance, instance -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
null)));
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      try {
+        Set<String> instances = resolveInstances(nodeConfig);
+        if (instances.isEmpty()) {
+          LOGGER.warn("No instances found for workload delete: {} with 
nodeConfig: {}", queryWorkloadName, nodeConfig);
+          continue;
+        }
+        QueryWorkloadRefreshMessage deleteMessage = new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+            QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
new InstanceCost(0, 0));
+        Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
+            .collect(Collectors.toMap(instance -> instance, instance -> 
deleteMessage));
+
+        // Send the QueryWorkloadRefreshMessage to the instances
+       sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+        LOGGER.info("Successfully propagated workload delete for: {} to {} 
instances", queryWorkloadName,
+            instances.size());
+      } catch (Exception e) {
+        String errorMsg = String.format("Failed to propagate workload delete 
for: %s with nodeConfig: %s",
+            queryWorkloadName, nodeConfig);
+        LOGGER.error(errorMsg, e);
+        throw new RuntimeException(errorMsg, e);
+      }
     }
   }
 
   /**
-   * Propagate the workload for the given table name, it does fast exits if 
queryWorkloadConfigs is empty
-   * @param tableName The table name to propagate the workload for, it can be 
a rawTableName or a tableNameWithType
-   * if rawTableName is provided, it will resolve all available tableTypes and 
propagate the workload for each tableType
-   *
-   * This method performs the following steps:
-   * 1. Find all the helix tags associated with the table
-   * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
-   * 3. Propagate the workload cost for instances associated with the workloads
+   * Propagates workload updates for all workloads that apply to the given 
table.
+   *
+   * <p>
+   * This helper performs the following:
+   * </p>
+   * <ol>
+   *   <li>Fetches all {@link QueryWorkloadConfig}s from Zookeeper.</li>
+   *   <li>Resolves the Helix tags associated with the table (supports raw 
table names and
+   *       type-qualified names).</li>
+   *   <li>Filters the workload configs to those whose scope matches the 
table's tags.</li>
+   *   <li>Invokes {@link 
#propagateWorkloadUpdateMessage(QueryWorkloadConfig)} for each match.</li>
+   * </ol>
+   *
+   * <p>
+   * If no workloads are configured, the method returns immediately. Any 
exception encountered is
+   * logged and rethrown as a {@link RuntimeException}.
+   * </p>
+   *
+   * @param tableName The raw or type-qualified table name (e.g., {@code 
myTable} or
+   *                  {@code myTable_OFFLINE}).
+   * @throws RuntimeException If propagation fails due to Helix/ZK access or 
message dispatch
+   *                          errors.
    */
   public void propagateWorkloadFor(String tableName) {
     try {
       List<QueryWorkloadConfig> queryWorkloadConfigs = 
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
       if (queryWorkloadConfigs.isEmpty()) {
-          return;
+        return;
       }
       // Get the helixTags associated with the table
       List<String> helixTags = 
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName);
+      if (helixTags.isEmpty()) {

Review Comment:
   Is this condition even possible? I thought tableValidation ensures that a 
broker/server tags are always present for tables. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -170,34 +312,182 @@ public Map<String, InstanceCost> 
getWorkloadToInstanceCostFor(String instanceNam
         return workloadToInstanceCostMap;
       }
 
-      // Find all workloads associated with the helix tags
+      // Find all helix tags for this instance
+      InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
+      if (instanceConfig == null) {
+        LOGGER.warn("Instance config not found for instance: {}", 
instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      List<String> instanceTags = instanceConfig.getTags();
+      if (instanceTags == null || instanceTags.isEmpty()) {
+        LOGGER.warn("No tags found for instance: {}, cannot compute workload 
costs", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // Filter workloads by the instance's tags
       Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
-          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceConfig.getTags(),
-                  queryWorkloadConfigs);
-      // Calculate the instance cost from each workload
+          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceTags,
+              queryWorkloadConfigs);
+
+      if (queryWorkloadConfigsForTags.isEmpty()) {
+        LOGGER.debug("No workload configs match instance: {}", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // For each workload, aggregate contributions across all applicable 
nodeConfigs and propagation entities
       for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
+        String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
         for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
-          if (nodeConfig.getNodeType() == nodeType) {
-            Set<String> instances = resolveInstances(nodeConfig);
-            InstanceCost instanceCost = 
_costSplitter.computeInstanceCost(nodeConfig, instances, instanceName);
-            if (instanceCost != null) {
-              
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+          try {
+            if (nodeConfig.getNodeType() == nodeType) {
+              List<String> errors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);

Review Comment:
   Same here. Why are we validating workloadConfigs here? It should happen only 
during add/update of workload configs. Validating workload configs here and 
failing is too late. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java:
##########
@@ -19,60 +19,263 @@
 package org.apache.pinot.controller.workload.scheme;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationEntity;
+import org.apache.pinot.spi.config.workload.PropagationEntityOverrides;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 /**
- * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
- * It resolves the instances based on the table names specified in the node 
configuration
+ * A {@code TablePropagationScheme} resolves Pinot instances based on table 
names in a node
+ * configuration.
+ *
  */
 public class TablePropagationScheme implements PropagationScheme {
 
-  private static PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  @Override
+  /**
+   * Resolves the union of all instances across all cost splits for the given 
node config.
+   *
+   * Example:
+   * <pre>
+   *   { "Broker_Instance_1", "Broker_Instance_2", "Server_Instance_1" }
+   * </pre>
+   */
   public Set<String> resolveInstances(NodeConfig nodeConfig) {
     Set<String> instances = new HashSet<>();
-    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
-    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
-            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
-    Map<String, Set<String>> helixTagToInstances
-            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);

Review Comment:
   Good catch! 
   However, this wouldn't work for Balanced as the Table-IP is never set... The 
ideal way to do it would be to check tableConfig and then rely on either 
Table-IP or Helix tags.
   
   For now, handling Balanced can be a TODO. 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java:
##########
@@ -227,6 +227,15 @@ public boolean canAdmitQuery(String workload) {
     BudgetStats stats = budget.getStats();
     return stats._cpuRemaining > 0 && stats._memoryRemaining > 0;
   }
+
+  public Map<String, BudgetStats> getAllBudgetStats() {

Review Comment:
   We already have getRemainingBudgetAcrossAllWorkloads(). This doesn't seem to 
be doing anything other than converting BudgetStats --> Map.  Can we remove 
this? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -170,34 +312,182 @@ public Map<String, InstanceCost> 
getWorkloadToInstanceCostFor(String instanceNam
         return workloadToInstanceCostMap;
       }
 
-      // Find all workloads associated with the helix tags
+      // Find all helix tags for this instance
+      InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
+      if (instanceConfig == null) {
+        LOGGER.warn("Instance config not found for instance: {}", 
instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      List<String> instanceTags = instanceConfig.getTags();
+      if (instanceTags == null || instanceTags.isEmpty()) {
+        LOGGER.warn("No tags found for instance: {}, cannot compute workload 
costs", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // Filter workloads by the instance's tags
       Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
-          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceConfig.getTags(),
-                  queryWorkloadConfigs);
-      // Calculate the instance cost from each workload
+          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceTags,
+              queryWorkloadConfigs);
+
+      if (queryWorkloadConfigsForTags.isEmpty()) {
+        LOGGER.debug("No workload configs match instance: {}", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // For each workload, aggregate contributions across all applicable 
nodeConfigs and propagation entities
       for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
+        String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
         for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
-          if (nodeConfig.getNodeType() == nodeType) {
-            Set<String> instances = resolveInstances(nodeConfig);
-            InstanceCost instanceCost = 
_costSplitter.computeInstanceCost(nodeConfig, instances, instanceName);
-            if (instanceCost != null) {
-              
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+          try {
+            if (nodeConfig.getNodeType() == nodeType) {
+              List<String> errors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);
+              if (!errors.isEmpty()) {
+                LOGGER.error("Invalid QueryWorkloadConfig: {} for instance: 
{}, errors: {}", queryWorkloadConfig,
+                    instanceName, errors);
+                continue;
+              }
+              Map<String, InstanceCost> instanceCostMap =
+                  
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme()
+                          
.getPropagationType()).resolveInstanceCostMap(nodeConfig, _costSplitter);
+
+              InstanceCost instanceCost = instanceCostMap.get(instanceName);
+              if (instanceCost != null) {
+                workloadToInstanceCostMap.put(queryWorkloadName, instanceCost);
+                LOGGER.info("Found workload cost for instance: {} workload: {} 
cost: {}",
+                    instanceName, queryWorkloadName, instanceCost);
+              }
+              // There should be only one matching nodeConfig (BROKER_NODE or 
SERVER_NODE) within a workload
+              break;
             }
-            break;
+          } catch (Exception e) {
+            LOGGER.error("Failed to compute instance cost for instance: {} 
workload: {}",
+                instanceName, queryWorkloadName, e);
+            // Continue with other workloads instead of failing completely
           }
         }
       }
+      LOGGER.info("Computed {} workload costs for instance: {}", 
workloadToInstanceCostMap.size(), instanceName);
       return workloadToInstanceCostMap;
     } catch (Exception e) {
-      String errorMsg = String.format("Failed to get workload to instance cost 
map for instance: %s", instanceName);
+      String errorMsg = String.format("Failed to compute workload costs for 
instance: %s", instanceName);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg, e);
     }
   }
 
   private Set<String> resolveInstances(NodeConfig nodeConfig) {
     PropagationScheme propagationScheme =
-            
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
+        
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
     return propagationScheme.resolveInstances(nodeConfig);
   }
+
+  /**
+   * For propagation entities with empty cpu or memory cost, distribute the 
remaining cost evenly among them.
+   * If all entities have defined costs, we do nothing.
+   *
+   * @param nodeConfig The node config containing the propagation entities to 
check and distribute costs for.
+   */
+  private void checkAndDistributeEmptyPropagationEntitiesEvenly(NodeConfig 
nodeConfig) {
+    List<PropagationEntity> propagationEntities = 
nodeConfig.getPropagationScheme().getPropagationEntities();
+    int emptyEntitiesCount = 0;
+    int definedEntitiesCount = 0;
+    for (PropagationEntity entity : propagationEntities) {
+      if (entity.getCpuCostNs() == null || entity.getMemoryCostBytes() == 
null) {
+        emptyEntitiesCount++;
+      } else {
+        definedEntitiesCount++;
+      }
+    }
+    if (definedEntitiesCount > 0 && emptyEntitiesCount > 0) {
+      String errorMsg = String.format("Mixed defined and empty costs in 
propagation entities is not supported. "
+          + "NodeConfig: %s", nodeConfig);
+      LOGGER.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    if (emptyEntitiesCount == propagationEntities.size()) {
+      // All entities have empty costs - distribute total budget evenly
+      long totalCpuCostNs = nodeConfig.getEnforcementProfile().getCpuCostNs();
+      long totalMemoryCostBytes = 
nodeConfig.getEnforcementProfile().getMemoryCostBytes();
+
+      int numEntities = propagationEntities.size();
+      long shareCpuCostNs = totalCpuCostNs / numEntities;
+      long shareMemoryCostBytes = totalMemoryCostBytes / numEntities;
+
+      for (PropagationEntity entity : propagationEntities) {
+        entity.setCpuCostNs(shareCpuCostNs);
+        entity.setMemoryCostBytes(shareMemoryCostBytes);
+      }
+      LOGGER.info("Distributed costs evenly across {} entities: CPU={}ns, 
Memory={}bytes per entity",
+          numEntities, shareCpuCostNs, shareMemoryCostBytes);
+    }
+  }
+
+  /**
+   * Sends the given map of {@link QueryWorkloadRefreshMessage} to their 
respective instances asynchronously.
+   *
+   * <p>
+   * Each message is sent in its own asynchronous task, and the method waits 
for all tasks to complete

Review Comment:
   (nit) all tasks to be queued (not complete?) 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,78 +82,186 @@ public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager)
   }
 
   /**
-   * Propagate the workload to the relevant instances based on the 
PropagationScheme
-   * @param queryWorkloadConfig The query workload configuration to propagate
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Calculate the instance cost for each instance
-   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   * Propagates an upsert of a workload's cost configuration to all relevant 
instances.
+   *
+   * <p>
+   * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig}, 
this method:
+   * </p>
+   * <ol>
+   *   <li>Resolves the {@link PropagationScheme} from the node's configured 
scheme type.</li>
+   *   <li>Computes the per-instance {@link InstanceCost} map using the 
configured
+   *       {@link CostSplitter}.</li>
+   *   <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+   *       {@link 
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+   *       instance with its computed cost.</li>
+   * </ol>
+   *
+   * <p>
+   * This call is idempotent from the manager's perspective: the same inputs 
will result in the
+   * same set of messages being sent. Instances are expected to apply the new 
costs immediately.
+   * </p>
+   *
+   * <p>
+   *  This call is atomic to the extent possible: if any error occurs during 
estimating the target instances
+   *  and their cost. The entire propagation is aborted and no partial updates 
are sent to any instances.
+   * </p>
+   *
+   * <p>
+   *  We rely on Helix reliable messaging to ensure message delivery to 
instances.
+   *  However, if an instance is down during the propagation, it will miss the 
update however, we have logic
+   *  on the instance side to fetch the latest workload configs from 
controller during startup.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload definition (name, node types, 
budgets, and propagation
+   *                            scheme) to propagate.
    */
   public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      // Resolve the instances based on the node type and propagation scheme
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+    Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new 
HashMap<>();
+    try {
+      for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+        PropagationScheme propagationScheme = 
_propagationSchemeProvider.getPropagationScheme(
+            nodeConfig.getPropagationScheme().getPropagationType());
+        // For propagation entities with empty cpu or memory cost, distribute 
the remaining cost evenly among them
+        checkAndDistributeEmptyPropagationEntitiesEvenly(nodeConfig);
+        Map<String, InstanceCost> instanceCostMap = 
propagationScheme.resolveInstanceCostMap(nodeConfig, _costSplitter);
+        if (instanceCostMap.isEmpty()) {
+          // This is to ensure that the configured entity is valid and maps to 
some instances
+          String errorMsg = String.format("No instances found for workload 
update: %s with nodeConfig: %s",
+              queryWorkloadName, nodeConfig);
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
+        }
+
+        Map<String, QueryWorkloadRefreshMessage> nodeToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+                
QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
+        instanceToRefreshMessageMap.putAll(nodeToRefreshMessageMap);
       }
-      Map<String, InstanceCost> instanceCostMap = 
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
-      // Send the QueryWorkloadRefreshMessage to the instances
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      // Sends the message only after all nodeConfigs are processed 
successfully
+      sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      LOGGER.info("Successfully propagated workload update for: {} to {} 
instances", queryWorkloadName,
+          instanceToRefreshMessageMap.size());
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to propagate workload update 
for: %s", queryWorkloadName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
     }
   }
 
   /**
-   * Propagate delete workload refresh message for the given 
queryWorkloadConfig
-   * @param queryWorkloadConfig The query workload configuration to delete
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Send the {@link QueryWorkloadRefreshMessage} with 
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+   * Propagates a delete for the given workload to all relevant instances.
+   *
+   * <p>
+   * The method resolves the target instances for each {@link NodeConfig} and 
sends a
+   * {@link QueryWorkloadRefreshMessage} with subtype
+   * {@link QueryWorkloadRefreshMessage#DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE},
+   * which instructs the instance to remove local state associated with the 
workload and stop enforcing costs for it.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload to delete (only the name and node 
scoping are used).
    */
   public void propagateDeleteWorkloadMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+    LOGGER.info("Propagating workload delete for: {}", queryWorkloadName);
+
     for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
+      if (nodeConfig == null) {
+        LOGGER.warn("Skipping null NodeConfig for workload delete: {}", 
queryWorkloadName);
         continue;
       }
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
-          .collect(Collectors.toMap(instance -> instance, instance -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
null)));
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      try {
+        Set<String> instances = resolveInstances(nodeConfig);
+        if (instances.isEmpty()) {
+          LOGGER.warn("No instances found for workload delete: {} with 
nodeConfig: {}", queryWorkloadName, nodeConfig);
+          continue;
+        }
+        QueryWorkloadRefreshMessage deleteMessage = new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+            QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
new InstanceCost(0, 0));

Review Comment:
   Is this InstanceCost(0,0) needed? What's wrong with null (which is more 
intuitive for a delete message). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to