praveenc7 commented on code in PR #16672:
URL: https://github.com/apache/pinot/pull/16672#discussion_r2353906560


##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java:
##########
@@ -19,60 +19,192 @@
 package org.apache.pinot.controller.workload.scheme;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.CostSplit;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 /**
- * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
- * It resolves the instances based on the table names specified in the node 
configuration
+ * A {@code TablePropagationScheme} resolves Pinot instances based on table 
names in a node
+ * configuration.
+ *
+ * <p>
+ * This scheme looks up Helix tags for offline and realtime tables and maps 
them to
+ * instances, enabling workload propagation by table.
+ * </p>
  */
 public class TablePropagationScheme implements PropagationScheme {
 
-  private static PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  @Override
+  /**
+   * Resolves the union of all instances across all cost splits for the given 
node config.
+   *
+   * @param nodeConfig Node configuration containing propagation scheme and 
cost splits.
+   * @return A set of instance names that should receive workload messages.
+   * @throws IllegalArgumentException If no instances are found for a cost 
split.
+   */
   public Set<String> resolveInstances(NodeConfig nodeConfig) {
     Set<String> instances = new HashSet<>();
-    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
-    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
-            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
-    Map<String, Set<String>> helixTagToInstances
-            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
-    for (String tableName : tableNames) {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-      List<String> tablesWithType = new ArrayList<>();
-      if (tableType == null) {
-        // Get both offline and realtime table names if type is not present.
-        
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
-        
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
-      } else {
-        tablesWithType.add(tableName);
+    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags =
+        PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
+    Map<String, Set<String>> helixTagToInstances =
+        PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+
+    NodeConfig.Type nodeType = nodeConfig.getNodeType();
+    for (CostSplit costSplit : 
nodeConfig.getPropagationScheme().getCostSplits()) {
+      Map<String, Set<String>> byTag = resolveInstancesByHelixTag(costSplit, 
nodeType, tableWithTypeToHelixTags,
+          helixTagToInstances);
+      if (!byTag.isEmpty()) {
+        for (Set<String> set : byTag.values()) {
+          instances.addAll(set);
+        }
       }
-      for (String tableWithType : tablesWithType) {
-        Map<NodeConfig.Type, Set<String>> nodeToHelixTags = 
tableWithTypeToHelixTags.get(tableWithType);
-        if (nodeToHelixTags != null) {
-          Set<String> helixTags = 
nodeToHelixTags.get(nodeConfig.getNodeType());
-          if (helixTags != null) {
-            for (String helixTag : helixTags) {
-              Set<String> helixInstances = helixTagToInstances.get(helixTag);
-              if (helixInstances != null) {
-                instances.addAll(helixInstances);
-              }
+    }
+    return instances;
+  }
+
+  /**
+   * Computes the per-instance cost map for the given node config using the 
provided splitter.
+   *
+   * <p>
+   * This method supports sub-allocations: the cost-ids in the sub-allocations 
are the helix tags.

Review Comment:
   Agree, this is not ideal
   
   The core issue is that we currently rely on tenantNames to distinguish 
between completed and consuming tenants. One workaround, as you suggested, 
would be to introduce a custom mapping (e.g., Realtime_CONSUMING or 
Realtime_COMPLETED) and hide that detail internally. I didn't select this 
approach since it essentially hardcodes an extra prefix that users would need 
to remember when configuring it, even though under the hood we’d still be doing 
the same thing. However the benefit would be to not remember tag.
   
   It would something like this
   
   {
     "queryWorkloadName": "myWorkload",
     "nodeConfigs": [
       {
         "nodeType": "SERVER_NODE",
         "enforcementProfile": {
           "cpuCostNs": 1000000000,
           "memoryCostBytes": 1073741824
         },
         "propagationScheme": {
           "propagationType": "TABLE",
           "propagationEntities": [
             {
               "propagationEntity": "myTable",
               "cpuCostNs": 800000000,
               "memoryCostBytes": 858993459,
               "subAllocations": [
                 {
                   "propagationEntity": "OFFLINE",
                   "cpuCostNs": 300000000,
                   "memoryCostBytes": 322122547
                 },
                 {
                   "propagationEntity": "REALTIME_CONSUMING", 
                   "cpuCostNs": 500000000,
                   "memoryCostBytes": 536870912
                 },
                 {
                   "propagationEntity": "REALTIME_COMPLETED", 
                   "cpuCostNs": 500000000,
                   "memoryCostBytes": 536870912
                 }
               ]
             }
           ]
         }
       }
     ]
   }
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to