vvivekiyer commented on code in PR #16018:
URL: https://github.com/apache/pinot/pull/16018#discussion_r2219885294


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -212,7 +212,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   /**
    * Approximate heap bytes used by the mutable JSON index at the time of 
index close.
    */
-  MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false);
+  MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
+  // Workload Budget exceeded counter
+  WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times 
workload budget exceeded");

Review Comment:
   I think it makes sense to have this metric on the broker too. That way we 
can get a count of the number of queries that failed because of exceeding 
workload budget. Would be good to see both broker-host level and <broker, 
workload> level metrics. 



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -199,6 +203,18 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
         return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
       }
 
+      String workloadName = 
sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.WORKLOAD_NAME);
+      boolean isSecondary = Boolean.parseBoolean(sqlNodeAndOptions.getOptions()
+          .getOrDefault(Broker.Request.QueryOptionKey.IS_SECONDARY_WORKLOAD, 
"false"));
+      if (workloadName != null && _workloadBudgetManager != null
+          && !_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary)) 
{
+        String errorMessage = "Request " + requestId + ": " + query + " 
exceeds query quota for workload: "
+            + workloadName;
+        LOGGER.info(errorMessage);
+        requestContext.setErrorCode(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED);
+        return new 
BrokerResponseNative(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED, errorMessage);

Review Comment:
   Is this admission control on the broker even needed? Can't we just rely on 
the Accountant to catch these kill them? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java:
##########
@@ -52,6 +52,7 @@ public enum QueryErrorCode {
   BROKER_REQUEST_SEND(425, "BrokerRequestSend"),
   SERVER_NOT_RESPONDING(427, "ServerNotResponding"),
   TOO_MANY_REQUESTS(429, "TooManyRequests"),
+  WORKLOAD_QUOTA_EXCEEDED(429, "WorkloadQuotaExceeded"),

Review Comment:
   Can we also change all other places to propagate this error code when 
terminating queries because of Workload exhaustion - in the accountant, etc. 



##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
     }
     _enforcementWindowMs = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
         CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+    initSecondaryWorkloadBudget(config);
     startBudgetResetTask();
     LOGGER.info("WorkloadBudgetManager initialized with enforcement window: 
{}ms", _enforcementWindowMs);
   }
 
+  /**
+   * This budget is primarily meant to be used for queries that need to be 
issued in a low priority manner.
+   * This is fixed budget allocated during host startup and used across all 
secondary queries.
+   */
+  private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+    _secondaryWorkloadName = config.getProperty(

Review Comment:
   Let us make this optional as not all users would be migrating from 
BinaryWorkloadScheduler?



##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
     }
     _enforcementWindowMs = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
         CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+    initSecondaryWorkloadBudget(config);
     startBudgetResetTask();
     LOGGER.info("WorkloadBudgetManager initialized with enforcement window: 
{}ms", _enforcementWindowMs);
   }
 
+  /**
+   * This budget is primarily meant to be used for queries that need to be 
issued in a low priority manner.
+   * This is fixed budget allocated during host startup and used across all 
secondary queries.
+   */
+  private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+    _secondaryWorkloadName = config.getProperty(
+        CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_NAME,
+        CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_NAME);
+
+    double secondaryCpuPercentage = config.getProperty(
+        CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_CPU_PERCENTAGE,
+        CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE);
+
+    // The Secondary CPU budget is based on the CPU percentage allocated for 
secondary workload.
+    // The memory budget is set to Long.MAX_VALUE for now, since we do not 
have a specific memory budget for
+    // secondary queries.
+    long secondaryCpuBudget = (long) (secondaryCpuPercentage * 
_enforcementWindowMs * 100_000L);

Review Comment:
   This computation seems off. 
   Considering that there are 50 threads. If secondaryCpuPercentage=10, you'll 
be allocating 20% of CPU to secondary queries?



##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -145,6 +168,43 @@ private void startBudgetResetTask() {
     }, _enforcementWindowMs, _enforcementWindowMs, TimeUnit.MILLISECONDS);
   }
 
+  /**
+   * Determines whether a query for the given workload can be admitted under 
CPU-only budgets.
+   *
+   * <p>Admission rules:
+   * <ol>
+   *   <li>If the manager is disabled or no budget exists for the workload, 
always admit.</li>
+   *   <li>If CPU budget remains above zero, admit immediately.</li>
+   *   <li>Otherwise, reject (return false).</li>
+   * </ol>
+   *
+   * <p>Note: This method currently uses a strict check, where CPU and memory 
budgets must be above zero.
+   * This may be relaxed in the future to allow for a percentage of other 
remaining budget to be used. At that point,
+   * we can have different admission policies like: Strict, Stealing, etc.
+   *
+   * @param workload the workload identifier to check budget for
+   * @return true if the query may be accepted; false if budget is insufficient
+   */
+  public boolean canAdmitQuery(String workload, boolean isSecondary) {

Review Comment:
   canAdmitQuery should only take workload as the parameter. The workload name 
will be equal to "defaultSecondary" if the query belongs to secondary workload. 
And as a part of the init function, we would have already allocated budget for 
it.  This will also eliminate the need for a member variable 
_secondaryWorkloadName. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/WorkloadResourceManager.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+public class WorkloadResourceManager extends ResourceManager {

Review Comment:
   I don't think this class is necessary. We can just use the 
UnboundedResourceManager for now. The only thing this is doing differently is 
setting thread limits based on the policy. But why is that needed here? 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.WorkloadResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler implementation that supports query admission control based on 
workload-specific budgets.
+ *
+ * <p>This class integrates with the {@link WorkloadBudgetManager} to apply 
CPU and memory budget enforcement
+ * for different workloads, including primary and secondary workloads.</p>
+ *
+ * <p>Secondary workload configuration is used for queries tagged as 
"secondary". Queries that exceed their budget
+ * will be rejected.</p>
+ *
+ */
+public class WorkloadScheduler extends QueryScheduler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkloadScheduler.class);
+
+  private final WorkloadBudgetManager _workloadBudgetManager;
+  private final ServerMetrics _serverMetrics;
+
+  public WorkloadScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics metrics,
+                           LongAccumulator latestQueryTime, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
+    super(config, queryExecutor, new WorkloadResourceManager(config, 
resourceUsageAccountant), metrics,
+        latestQueryTime);
+    _serverMetrics = metrics;
+    _workloadBudgetManager = 
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+  }
+
+  @Override
+  public String name() {
+    return "WorkloadScheduler";
+  }
+
+  @Override
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    if (!_isRunning) {
+      return shuttingDown(queryRequest);
+    }
+
+    boolean isSecondary = 
QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());

Review Comment:
   if isSecondary is true, use the configs here to populate workload name. Add 
a comment mentioning that this is needed for backward compatibility when 
migrating from BinaryWorkloadScheduler to WorkloadScheduler. 



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.WorkloadResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler implementation that supports query admission control based on 
workload-specific budgets.
+ *
+ * <p>This class integrates with the {@link WorkloadBudgetManager} to apply 
CPU and memory budget enforcement
+ * for different workloads, including primary and secondary workloads.</p>
+ *
+ * <p>Secondary workload configuration is used for queries tagged as 
"secondary". Queries that exceed their budget
+ * will be rejected.</p>
+ *
+ */
+public class WorkloadScheduler extends QueryScheduler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkloadScheduler.class);
+
+  private final WorkloadBudgetManager _workloadBudgetManager;
+  private final ServerMetrics _serverMetrics;
+
+  public WorkloadScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics metrics,
+                           LongAccumulator latestQueryTime, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
+    super(config, queryExecutor, new WorkloadResourceManager(config, 
resourceUsageAccountant), metrics,
+        latestQueryTime);
+    _serverMetrics = metrics;
+    _workloadBudgetManager = 
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+  }
+
+  @Override
+  public String name() {
+    return "WorkloadScheduler";
+  }
+
+  @Override
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    if (!_isRunning) {
+      return shuttingDown(queryRequest);
+    }
+
+    boolean isSecondary = 
QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());
+    String workloadName = 
QueryOptionsUtils.getWorkloadName(queryRequest.getQueryContext().getQueryOptions());
+    if (!_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary)) {
+      String tableName = 
TableNameBuilder.extractRawTableName(queryRequest.getTableNameWithType());

Review Comment:
   Explore queueing the requests here (until a certain threshold is reached) 
and admit the queries. This can be a TODO for later but do add a comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to