praveenc7 commented on code in PR #16672:
URL: https://github.com/apache/pinot/pull/16672#discussion_r2377012252
##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,78 +82,186 @@ public QueryWorkloadManager(PinotHelixResourceManager
pinotHelixResourceManager)
}
/**
- * Propagate the workload to the relevant instances based on the
PropagationScheme
- * @param queryWorkloadConfig The query workload configuration to propagate
- * 1. Resolve the instances based on the node type and propagation scheme
- * 2. Calculate the instance cost for each instance
- * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+ * Propagates an upsert of a workload's cost configuration to all relevant
instances.
+ *
+ * <p>
+ * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig},
this method:
+ * </p>
+ * <ol>
+ * <li>Resolves the {@link PropagationScheme} from the node's configured
scheme type.</li>
+ * <li>Computes the per-instance {@link InstanceCost} map using the
configured
+ * {@link CostSplitter}.</li>
+ * <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+ * {@link
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+ * instance with its computed cost.</li>
+ * </ol>
+ *
+ * <p>
+ * This call is idempotent from the manager's perspective: the same inputs
will result in the
+ * same set of messages being sent. Instances are expected to apply the new
costs immediately.
+ * </p>
+ *
+ * <p>
+ * This call is atomic to the extent possible: if any error occurs during
estimating the target instances
+ * and their cost. The entire propagation is aborted and no partial updates
are sent to any instances.
+ * </p>
+ *
+ * <p>
+ * We rely on Helix reliable messaging to ensure message delivery to
instances.
+ * However, if an instance is down during the propagation, it will miss the
update however, we have logic
+ * on the instance side to fetch the latest workload configs from
controller during startup.
+ * </p>
+ *
+ * @param queryWorkloadConfig The workload definition (name, node types,
budgets, and propagation
+ * scheme) to propagate.
*/
public void propagateWorkloadUpdateMessage(QueryWorkloadConfig
queryWorkloadConfig) {
String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
- for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
- // Resolve the instances based on the node type and propagation scheme
- Set<String> instances = resolveInstances(nodeConfig);
- if (instances.isEmpty()) {
- String errorMsg = String.format("No instances found for Workload: %s",
queryWorkloadName);
- LOGGER.warn(errorMsg);
- continue;
+ LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+ Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new
HashMap<>();
+ try {
+ for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+ PropagationScheme propagationScheme =
_propagationSchemeProvider.getPropagationScheme(
+ nodeConfig.getPropagationScheme().getPropagationType());
+ // For propagation entities with empty cpu or memory cost, distribute
the remaining cost evenly among them
+ checkAndDistributeEmptyPropagationEntitiesEvenly(nodeConfig);
+ Map<String, InstanceCost> instanceCostMap =
propagationScheme.resolveInstanceCostMap(nodeConfig, _costSplitter);
+ if (instanceCostMap.isEmpty()) {
+ // This is to ensure that the configured entity is valid and maps to
some instances
+ String errorMsg = String.format("No instances found for workload
update: %s with nodeConfig: %s",
+ queryWorkloadName, nodeConfig);
+ LOGGER.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+
+ Map<String, QueryWorkloadRefreshMessage> nodeToRefreshMessageMap =
instanceCostMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> new
QueryWorkloadRefreshMessage(queryWorkloadName,
+
QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE,
entry.getValue())));
+ instanceToRefreshMessageMap.putAll(nodeToRefreshMessageMap);
}
- Map<String, InstanceCost> instanceCostMap =
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
- Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap =
instanceCostMap.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, entry -> new
QueryWorkloadRefreshMessage(queryWorkloadName,
- QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE,
entry.getValue())));
- // Send the QueryWorkloadRefreshMessage to the instances
-
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+ // Sends the message only after all nodeConfigs are processed
successfully
+ sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+ LOGGER.info("Successfully propagated workload update for: {} to {}
instances", queryWorkloadName,
+ instanceToRefreshMessageMap.size());
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to propagate workload update
for: %s", queryWorkloadName);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
}
}
/**
- * Propagate delete workload refresh message for the given
queryWorkloadConfig
- * @param queryWorkloadConfig The query workload configuration to delete
- * 1. Resolve the instances based on the node type and propagation scheme
- * 2. Send the {@link QueryWorkloadRefreshMessage} with
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+ * Propagates a delete for the given workload to all relevant instances.
+ *
+ * <p>
+ * The method resolves the target instances for each {@link NodeConfig} and
sends a
+ * {@link QueryWorkloadRefreshMessage} with subtype
+ * {@link QueryWorkloadRefreshMessage#DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE},
+ * which instructs the instance to remove local state associated with the
workload and stop enforcing costs for it.
+ * </p>
+ *
+ * @param queryWorkloadConfig The workload to delete (only the name and node
scoping are used).
*/
public void propagateDeleteWorkloadMessage(QueryWorkloadConfig
queryWorkloadConfig) {
String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+ LOGGER.info("Propagating workload delete for: {}", queryWorkloadName);
+
for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
- Set<String> instances = resolveInstances(nodeConfig);
- if (instances.isEmpty()) {
- String errorMsg = String.format("No instances found for Workload: %s",
queryWorkloadName);
- LOGGER.warn(errorMsg);
+ if (nodeConfig == null) {
+ LOGGER.warn("Skipping null NodeConfig for workload delete: {}",
queryWorkloadName);
continue;
}
- Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap =
instances.stream()
- .collect(Collectors.toMap(instance -> instance, instance -> new
QueryWorkloadRefreshMessage(queryWorkloadName,
- QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE,
null)));
-
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+ try {
+ Set<String> instances = resolveInstances(nodeConfig);
+ if (instances.isEmpty()) {
+ LOGGER.warn("No instances found for workload delete: {} with
nodeConfig: {}", queryWorkloadName, nodeConfig);
+ continue;
+ }
+ QueryWorkloadRefreshMessage deleteMessage = new
QueryWorkloadRefreshMessage(queryWorkloadName,
+ QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE,
new InstanceCost(0, 0));
+ Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap =
instances.stream()
+ .collect(Collectors.toMap(instance -> instance, instance ->
deleteMessage));
+
+ // Send the QueryWorkloadRefreshMessage to the instances
+ sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+ LOGGER.info("Successfully propagated workload delete for: {} to {}
instances", queryWorkloadName,
+ instances.size());
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to propagate workload delete
for: %s with nodeConfig: %s",
+ queryWorkloadName, nodeConfig);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
}
}
/**
- * Propagate the workload for the given table name, it does fast exits if
queryWorkloadConfigs is empty
- * @param tableName The table name to propagate the workload for, it can be
a rawTableName or a tableNameWithType
- * if rawTableName is provided, it will resolve all available tableTypes and
propagate the workload for each tableType
- *
- * This method performs the following steps:
- * 1. Find all the helix tags associated with the table
- * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
- * 3. Propagate the workload cost for instances associated with the workloads
+ * Propagates workload updates for all workloads that apply to the given
table.
+ *
+ * <p>
+ * This helper performs the following:
+ * </p>
+ * <ol>
+ * <li>Fetches all {@link QueryWorkloadConfig}s from Zookeeper.</li>
+ * <li>Resolves the Helix tags associated with the table (supports raw
table names and
+ * type-qualified names).</li>
+ * <li>Filters the workload configs to those whose scope matches the
table's tags.</li>
+ * <li>Invokes {@link
#propagateWorkloadUpdateMessage(QueryWorkloadConfig)} for each match.</li>
+ * </ol>
+ *
+ * <p>
+ * If no workloads are configured, the method returns immediately. Any
exception encountered is
+ * logged and rethrown as a {@link RuntimeException}.
+ * </p>
+ *
+ * @param tableName The raw or type-qualified table name (e.g., {@code
myTable} or
+ * {@code myTable_OFFLINE}).
+ * @throws RuntimeException If propagation fails due to Helix/ZK access or
message dispatch
+ * errors.
*/
public void propagateWorkloadFor(String tableName) {
try {
List<QueryWorkloadConfig> queryWorkloadConfigs =
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
if (queryWorkloadConfigs.isEmpty()) {
- return;
+ return;
}
// Get the helixTags associated with the table
List<String> helixTags =
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName);
+ if (helixTags.isEmpty()) {
+ LOGGER.warn("No Helix tags found for table: {}, skipping workload
propagation", tableName);
+ return;
+ }
+
// Find all workloads associated with the helix tags
Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager,
helixTags, queryWorkloadConfigs);
+
+ if (queryWorkloadConfigsForTags.isEmpty()) {
+ LOGGER.info("No workload configs match table: {}, no propagation
needed", tableName);
+ return;
+ }
+
// Propagate the workload for each QueryWorkloadConfig
+ int successCount = 0;
for (QueryWorkloadConfig queryWorkloadConfig :
queryWorkloadConfigsForTags) {
- propagateWorkloadUpdateMessage(queryWorkloadConfig);
+ try {
+ List<String> errors =
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);
Review Comment:
I would still like to have some validation to ensure the operator is doing
the right thing, the validation here is cheap here and in case of bad manual
edits, this would provide good debugging info and disallow any bad propagation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]