vvivekiyer commented on code in PR #16018:
URL: https://github.com/apache/pinot/pull/16018#discussion_r2219885294
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -212,7 +212,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
/**
* Approximate heap bytes used by the mutable JSON index at the time of
index close.
*/
- MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false);
+ MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
+ // Workload Budget exceeded counter
+ WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times
workload budget exceeded");
Review Comment:
I think it makes sense to have this metric on the broker too. That way we
can get a count of the number of queries that failed because of exceeding
workload budget. Would be good to see both broker-host level and <broker,
workload> level metrics.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -199,6 +203,18 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable SqlNodeAndOption
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS,
errorMessage);
}
+ String workloadName =
sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.WORKLOAD_NAME);
+ boolean isSecondary = Boolean.parseBoolean(sqlNodeAndOptions.getOptions()
+ .getOrDefault(Broker.Request.QueryOptionKey.IS_SECONDARY_WORKLOAD,
"false"));
+ if (workloadName != null && _workloadBudgetManager != null
+ && !_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary))
{
+ String errorMessage = "Request " + requestId + ": " + query + "
exceeds query quota for workload: "
+ + workloadName;
+ LOGGER.info(errorMessage);
+ requestContext.setErrorCode(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED);
+ return new
BrokerResponseNative(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED, errorMessage);
Review Comment:
Is this admission control on the broker even needed? Can't we just rely on
the Accountant to catch these kill them?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java:
##########
@@ -52,6 +52,7 @@ public enum QueryErrorCode {
BROKER_REQUEST_SEND(425, "BrokerRequestSend"),
SERVER_NOT_RESPONDING(427, "ServerNotResponding"),
TOO_MANY_REQUESTS(429, "TooManyRequests"),
+ WORKLOAD_QUOTA_EXCEEDED(429, "WorkloadQuotaExceeded"),
Review Comment:
Can we also change all other places to propagate this error code when
terminating queries because of Workload exhaustion - in the accountant, etc.
##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
}
_enforcementWindowMs =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+ initSecondaryWorkloadBudget(config);
startBudgetResetTask();
LOGGER.info("WorkloadBudgetManager initialized with enforcement window:
{}ms", _enforcementWindowMs);
}
+ /**
+ * This budget is primarily meant to be used for queries that need to be
issued in a low priority manner.
+ * This is fixed budget allocated during host startup and used across all
secondary queries.
+ */
+ private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+ _secondaryWorkloadName = config.getProperty(
Review Comment:
Let us make this optional as not all users would be migrating from
BinaryWorkloadScheduler?
##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
}
_enforcementWindowMs =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+ initSecondaryWorkloadBudget(config);
startBudgetResetTask();
LOGGER.info("WorkloadBudgetManager initialized with enforcement window:
{}ms", _enforcementWindowMs);
}
+ /**
+ * This budget is primarily meant to be used for queries that need to be
issued in a low priority manner.
+ * This is fixed budget allocated during host startup and used across all
secondary queries.
+ */
+ private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+ _secondaryWorkloadName = config.getProperty(
+ CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_NAME,
+ CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_NAME);
+
+ double secondaryCpuPercentage = config.getProperty(
+ CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_CPU_PERCENTAGE,
+ CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE);
+
+ // The Secondary CPU budget is based on the CPU percentage allocated for
secondary workload.
+ // The memory budget is set to Long.MAX_VALUE for now, since we do not
have a specific memory budget for
+ // secondary queries.
+ long secondaryCpuBudget = (long) (secondaryCpuPercentage *
_enforcementWindowMs * 100_000L);
Review Comment:
This computation seems off.
Considering that there are 50 threads. If secondaryCpuPercentage=10, you'll
be allocating 20% of CPU to secondary queries?
##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -145,6 +168,43 @@ private void startBudgetResetTask() {
}, _enforcementWindowMs, _enforcementWindowMs, TimeUnit.MILLISECONDS);
}
+ /**
+ * Determines whether a query for the given workload can be admitted under
CPU-only budgets.
+ *
+ * <p>Admission rules:
+ * <ol>
+ * <li>If the manager is disabled or no budget exists for the workload,
always admit.</li>
+ * <li>If CPU budget remains above zero, admit immediately.</li>
+ * <li>Otherwise, reject (return false).</li>
+ * </ol>
+ *
+ * <p>Note: This method currently uses a strict check, where CPU and memory
budgets must be above zero.
+ * This may be relaxed in the future to allow for a percentage of other
remaining budget to be used. At that point,
+ * we can have different admission policies like: Strict, Stealing, etc.
+ *
+ * @param workload the workload identifier to check budget for
+ * @return true if the query may be accepted; false if budget is insufficient
+ */
+ public boolean canAdmitQuery(String workload, boolean isSecondary) {
Review Comment:
canAdmitQuery should only take workload as the parameter. The workload name
will be equal to "defaultSecondary" if the query belongs to secondary workload.
And as a part of the init function, we would have already allocated budget for
it. This will also eliminate the need for a member variable
_secondaryWorkloadName.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/WorkloadResourceManager.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+public class WorkloadResourceManager extends ResourceManager {
Review Comment:
I don't think this class is necessary. We can just use the
UnboundedResourceManager for now. The only thing this is doing differently is
setting thread limits based on the policy. But why is that needed here?
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.WorkloadResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler implementation that supports query admission control based on
workload-specific budgets.
+ *
+ * <p>This class integrates with the {@link WorkloadBudgetManager} to apply
CPU and memory budget enforcement
+ * for different workloads, including primary and secondary workloads.</p>
+ *
+ * <p>Secondary workload configuration is used for queries tagged as
"secondary". Queries that exceed their budget
+ * will be rejected.</p>
+ *
+ */
+public class WorkloadScheduler extends QueryScheduler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WorkloadScheduler.class);
+
+ private final WorkloadBudgetManager _workloadBudgetManager;
+ private final ServerMetrics _serverMetrics;
+
+ public WorkloadScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics metrics,
+ LongAccumulator latestQueryTime,
ThreadResourceUsageAccountant resourceUsageAccountant) {
+ super(config, queryExecutor, new WorkloadResourceManager(config,
resourceUsageAccountant), metrics,
+ latestQueryTime);
+ _serverMetrics = metrics;
+ _workloadBudgetManager =
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+ }
+
+ @Override
+ public String name() {
+ return "WorkloadScheduler";
+ }
+
+ @Override
+ public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+ if (!_isRunning) {
+ return shuttingDown(queryRequest);
+ }
+
+ boolean isSecondary =
QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());
Review Comment:
if isSecondary is true, use the configs here to populate workload name. Add
a comment mentioning that this is needed for backward compatibility when
migrating from BinaryWorkloadScheduler to WorkloadScheduler.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.WorkloadResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler implementation that supports query admission control based on
workload-specific budgets.
+ *
+ * <p>This class integrates with the {@link WorkloadBudgetManager} to apply
CPU and memory budget enforcement
+ * for different workloads, including primary and secondary workloads.</p>
+ *
+ * <p>Secondary workload configuration is used for queries tagged as
"secondary". Queries that exceed their budget
+ * will be rejected.</p>
+ *
+ */
+public class WorkloadScheduler extends QueryScheduler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WorkloadScheduler.class);
+
+ private final WorkloadBudgetManager _workloadBudgetManager;
+ private final ServerMetrics _serverMetrics;
+
+ public WorkloadScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics metrics,
+ LongAccumulator latestQueryTime,
ThreadResourceUsageAccountant resourceUsageAccountant) {
+ super(config, queryExecutor, new WorkloadResourceManager(config,
resourceUsageAccountant), metrics,
+ latestQueryTime);
+ _serverMetrics = metrics;
+ _workloadBudgetManager =
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+ }
+
+ @Override
+ public String name() {
+ return "WorkloadScheduler";
+ }
+
+ @Override
+ public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+ if (!_isRunning) {
+ return shuttingDown(queryRequest);
+ }
+
+ boolean isSecondary =
QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());
+ String workloadName =
QueryOptionsUtils.getWorkloadName(queryRequest.getQueryContext().getQueryOptions());
+ if (!_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary)) {
+ String tableName =
TableNameBuilder.extractRawTableName(queryRequest.getTableNameWithType());
Review Comment:
Explore queueing the requests here (until a certain threshold is reached)
and admit the queries. This can be a TODO for later but do add a comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]