This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3a75d4abb2 Binary Workload Scheduler for constrained execution of a
set of queries (#13847)
3a75d4abb2 is described below
commit 3a75d4abb2c897b5b368ea55ffd7a4e6ea8ba0e5
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Sat Aug 24 09:53:01 2024 -0700
Binary Workload Scheduler for constrained execution of a set of queries
(#13847)
* Binary Workload Scheduler
* Address review comments
---
.../apache/pinot/common/metrics/ServerMeter.java | 4 +
.../apache/pinot/common/metrics/ServerTimer.java | 3 +
.../common/utils/config/QueryOptionsUtils.java | 4 +
.../query/scheduler/BinaryWorkloadScheduler.java | 219 +++++++++++++++++++++
.../query/scheduler/QuerySchedulerFactory.java | 4 +
.../query/scheduler/SecondaryWorkloadQueue.java | 167 ++++++++++++++++
.../resources/BinaryWorkloadResourceManager.java | 105 ++++++++++
.../query/scheduler/QuerySchedulerFactoryTest.java | 5 +
.../apache/pinot/spi/utils/CommonConstants.java | 8 +
9 files changed, 519 insertions(+)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 8cadec6bfc..799c8790c2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -30,6 +30,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
+ NUM_SECONDARY_QUERIES("queries", false),
+ NUM_SECONDARY_QUERIES_SCHEDULED("queries", false),
+ SERVER_OUT_OF_CAPACITY_EXCEPTIONS("exceptions", false),
+
QUERY_EXECUTION_EXCEPTIONS("exceptions", false),
HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
DELETED_SEGMENT_COUNT("segments", false),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index b3e5e70641..63b42440a6 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -59,6 +59,9 @@ public enum ServerTimer implements AbstractMetrics.Timer {
DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
"Total time taken to delete expired dedup primary keys based on
metadataTTL or deletedKeysTTL"),
+ SECONDARY_Q_WAIT_TIME_MS("milliseconds", false,
+ "Time spent waiting in the secondary queue when BinaryWorkloadScheduler
is used."),
+
// Multi-stage
/**
* Time spent building the hash table for the join.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index fe1b348a28..d1900f6a8f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -310,4 +310,8 @@ public class QueryOptionsUtils {
public static boolean isSkipUnavailableServers(Map<String, String>
queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS));
}
+
+ public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
new file mode 100644
index 0000000000..93e84c1e6a
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import
org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This scheduler is designed to deal with two types of workloads
+ * 1. Primary Workloads -> regular queries from the application
+ * 2. Secondary Workloads -> adhoc queries fired from tools, testing, etc
+ *
+ *
+ * Primary Workload Queries
+ * Primary workloads queries are executed with priority and submitted to the
Runner threads as and when they arrive.
+ * The resources used by a primary workload query is not capped.
+ *
+ * Secondary Workload Queries
+ * - Secondary workload queries are identified using a query option -> "SET
isSecondaryWorkload=true"
+ * - Secondary workload queries are contained as follows:
+ * - Restrictions on number of runner threads available to process
secondary queries
+ * - Restrictions on total number of worker threads available to process
a single secondary query
+ * - Restrictions on total number of worker threads available to process
all in-progress secondary queries
+ */
+public class BinaryWorkloadScheduler extends QueryScheduler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BinaryWorkloadScheduler.class);
+
+ public static final String MAX_SECONDARY_QUERIES =
"binarywlm.maxSecondaryRunnerThreads";
+ public static final int DEFAULT_MAX_SECONDARY_QUERIES = 5;
+
+ // Secondary Workload Runners.
+ private final int _numSecondaryRunners;
+ private final Semaphore _secondaryRunnerSemaphore;
+
+ private final SecondaryWorkloadQueue _secondaryQueryQ;
+
+ Thread _scheduler;
+
+ public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics metrics,
+ LongAccumulator latestQueryTime) {
+ super(config, queryExecutor, new BinaryWorkloadResourceManager(config),
metrics, latestQueryTime);
+
+ _secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager);
+ _numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES,
DEFAULT_MAX_SECONDARY_QUERIES);
+ LOGGER.info("numSecondaryRunners={}", _numSecondaryRunners);
+ _secondaryRunnerSemaphore = new Semaphore(_numSecondaryRunners);
+ }
+
+ @Override
+ public String name() {
+ return "BinaryWorkloadScheduler";
+ }
+
+ @Override
+ public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+ if (!_isRunning) {
+ return immediateErrorResponse(queryRequest,
QueryException.SERVER_SCHEDULER_DOWN_ERROR);
+ }
+
+
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
+ if
(!QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions()))
{
+ QueryExecutorService queryExecutorService =
_resourceManager.getExecutorService(queryRequest, null);
+ ListenableFutureTask<byte[]> queryTask =
createQueryFutureTask(queryRequest, queryExecutorService);
+ _resourceManager.getQueryRunners().submit(queryTask);
+ return queryTask;
+ }
+
+ final SchedulerQueryContext schedQueryContext = new
SchedulerQueryContext(queryRequest);
+ try {
+ // Update metrics
+ _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(),
ServerMeter.NUM_SECONDARY_QUERIES, 1L);
+
+ _secondaryQueryQ.put(schedQueryContext);
+ } catch (OutOfCapacityException e) {
+ LOGGER.error("Out of capacity for query {} table {}, message: {}",
queryRequest.getRequestId(),
+ queryRequest.getTableNameWithType(), e.getMessage());
+ return immediateErrorResponse(queryRequest,
QueryException.SERVER_OUT_OF_CAPACITY_ERROR);
+ } catch (Exception e) {
+ // We should not throw any other exception other than
OutOfCapacityException. Signal that there's an issue with
+ // the scheduler if any other exception is thrown.
+ LOGGER.error("Internal error for query {} table {}, message {}",
queryRequest.getRequestId(),
+ queryRequest.getTableNameWithType(), e.getMessage());
+ return immediateErrorResponse(queryRequest,
QueryException.SERVER_SCHEDULER_DOWN_ERROR);
+ }
+ return schedQueryContext.getResultFuture();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ _scheduler = getScheduler();
+ _scheduler.setName("scheduler");
+ // TODO: Considering setting a lower priority to avoid busy loop when all
threads are busy processing queries.
+ _scheduler.setPriority(Thread.MAX_PRIORITY);
+ _scheduler.setDaemon(true);
+ _scheduler.start();
+ }
+
+ private Thread getScheduler() {
+ return new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (_isRunning) {
+ try {
+ _secondaryRunnerSemaphore.acquire();
+ } catch (InterruptedException e) {
+ if (!_isRunning) {
+ LOGGER.info("Shutting down scheduler");
+ } else {
+ LOGGER.error("Interrupt while acquiring semaphore. Exiting.", e);
+ }
+ break;
+ }
+ try {
+ final SchedulerQueryContext request = _secondaryQueryQ.take();
+ if (request == null) {
+ continue;
+ }
+ ServerQueryRequest queryRequest = request.getQueryRequest();
+ final QueryExecutorService executor =
+ _resourceManager.getExecutorService(queryRequest,
request.getSchedulerGroup());
+ final ListenableFutureTask<byte[]> queryFutureTask =
createQueryFutureTask(queryRequest, executor);
+ queryFutureTask.addListener(new Runnable() {
+ @Override
+ public void run() {
+ executor.releaseWorkers();
+ request.getSchedulerGroup().endQuery();
+ _secondaryRunnerSemaphore.release();
+ checkStopResourceManager();
+ }
+ }, MoreExecutors.directExecutor());
+
+ // Update metrics
+ updateSecondaryWorkloadMetrics(queryRequest);
+
+ request.setResultFuture(queryFutureTask);
+ request.getSchedulerGroup().startQuery();
+ _resourceManager.getQueryRunners().submit(queryFutureTask);
+ } catch (Throwable t) {
+ LOGGER.error(
+ "Error in scheduler thread. This is indicative of a bug.
Please report this. Server will continue "
+ + "with errors", t);
+ }
+ }
+ if (_isRunning) {
+ throw new RuntimeException("FATAL: Scheduler thread is
quitting.....something went horribly wrong.....!!!");
+ } else {
+ failAllPendingQueries();
+ }
+ }
+ });
+ }
+
+ private void updateSecondaryWorkloadMetrics(ServerQueryRequest queryRequest)
{
+ long timeInQMs = System.currentTimeMillis() -
queryRequest.getTimerContext().getQueryArrivalTimeMs();
+ _serverMetrics.addTimedTableValue(queryRequest.getTableNameWithType(),
ServerTimer.SECONDARY_Q_WAIT_TIME_MS,
+ timeInQMs, TimeUnit.MILLISECONDS);
+ _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(),
+ ServerMeter.NUM_SECONDARY_QUERIES_SCHEDULED, 1L);
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ // without this, scheduler will never stop if there are no pending queries
+ if (_scheduler != null) {
+ _scheduler.interrupt();
+ }
+ }
+
+ private void checkStopResourceManager() {
+ if (!_isRunning && _secondaryRunnerSemaphore.availablePermits() ==
_numSecondaryRunners) {
+ _resourceManager.stop();
+ }
+ }
+
+ synchronized private void failAllPendingQueries() {
+ List<SchedulerQueryContext> pending = _secondaryQueryQ.drain();
+ for (SchedulerQueryContext queryContext : pending) {
+ queryContext.setResultFuture(
+ immediateErrorResponse(queryContext.getQueryRequest(),
QueryException.SERVER_SCHEDULER_DOWN_ERROR));
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
index 86c7170d93..69dd8bed36 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
@@ -45,6 +45,7 @@ public class QuerySchedulerFactory {
public static final String FCFS_ALGORITHM = "fcfs";
public static final String TOKEN_BUCKET_ALGORITHM = "tokenbucket";
public static final String BOUNDED_FCFS_ALGORITHM = "bounded_fcfs";
+ public static final String BINARY_WORKLOAD_ALGORITHM = "binary_workload";
public static final String ALGORITHM_NAME_CONFIG_KEY = "name";
public static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM =
FCFS_ALGORITHM;
@@ -73,6 +74,9 @@ public class QuerySchedulerFactory {
case BOUNDED_FCFS_ALGORITHM:
scheduler = BoundedFCFSScheduler.create(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime);
break;
+ case BINARY_WORKLOAD_ALGORITHM:
+ scheduler = new BinaryWorkloadScheduler(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime);
+ break;
default:
scheduler =
getQuerySchedulerByClassName(schedulerName, schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java
new file mode 100644
index 0000000000..82736dbf0f
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.fcfs.FCFSSchedulerGroup;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Queue to maintain secondary workload queries. Used by the
BinaryWorkloadScheduler.
+ */
+public class SecondaryWorkloadQueue {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SecondaryWorkloadQueue.class);
+ private static final String SECONDARY_WORKLOAD_GROUP_NAME = "Secondary";
+
+ public static final String SECONDARY_QUEUE_QUERY_TIMEOUT =
"binarywlm.secondaryQueueQueryTimeout";
+ private static final int DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC = 40;
+
+ public static final String MAX_PENDING_SECONDARY_QUERIES =
"binarywlm.maxPendingSecondaryQueries";
+ private static final int DEFAULT_MAX_PENDING_SECONDARY_QUERIES = 20;
+
+ public static final String QUEUE_WAKEUP_MS = "binarywlm.queueWakeupMs";
+ private static final int DEFAULT_WAKEUP_MS = 1;
+
+ private static int _wakeUpTimeMs;
+ private final int _maxPendingPerGroup;
+
+ private final SchedulerGroup _schedulerGroup;
+
+ private final Lock _queueLock = new ReentrantLock();
+ private final Condition _queryReaderCondition = _queueLock.newCondition();
+ private final ResourceManager _resourceManager;
+ private final int _queryDeadlineMs;
+
+ public SecondaryWorkloadQueue(PinotConfiguration config, ResourceManager
resourceManager) {
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(resourceManager);
+
+ _queryDeadlineMs =
+ config.getProperty(SECONDARY_QUEUE_QUERY_TIMEOUT,
DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC) * 1000;
+ _wakeUpTimeMs = config.getProperty(QUEUE_WAKEUP_MS, DEFAULT_WAKEUP_MS);
+ _maxPendingPerGroup = config.getProperty(MAX_PENDING_SECONDARY_QUERIES,
DEFAULT_MAX_PENDING_SECONDARY_QUERIES);
+ LOGGER.info("queryDeadlineMs={}, wakeupTimeMs={},maxPendingPerGroup={}",
_queryDeadlineMs, _wakeUpTimeMs,
+ _maxPendingPerGroup);
+ _schedulerGroup = new FCFSSchedulerGroup(SECONDARY_WORKLOAD_GROUP_NAME);
+ _resourceManager = resourceManager;
+ }
+
+ /**
+ * Adds a query to the secondary workload queue.
+ * @param query
+ * @throws OutOfCapacityException
+ */
+ public void put(SchedulerQueryContext query)
+ throws OutOfCapacityException {
+ Preconditions.checkNotNull(query);
+ _queueLock.lock();
+ try {
+ checkSchedulerGroupCapacity(query);
+ query.setSchedulerGroupContext(_schedulerGroup);
+ _schedulerGroup.addLast(query);
+ _queryReaderCondition.signal();
+ } finally {
+ _queueLock.unlock();
+ }
+ }
+
+ /**
+ * Blocking call to read the next query
+ * @return
+ */
+ @Nullable
+ public SchedulerQueryContext take() {
+ _queueLock.lock();
+ try {
+ while (true) {
+ SchedulerQueryContext schedulerQueryContext;
+ while ((schedulerQueryContext = takeNextInternal()) == null) {
+ try {
+ _queryReaderCondition.await(_wakeUpTimeMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+ return schedulerQueryContext;
+ }
+ } finally {
+ _queueLock.unlock();
+ }
+ }
+
+ public List<SchedulerQueryContext> drain() {
+ List<SchedulerQueryContext> pending = new ArrayList<>();
+ _queueLock.lock();
+ try {
+ while (!_schedulerGroup.isEmpty()) {
+ pending.add(_schedulerGroup.removeFirst());
+ }
+ } finally {
+ _queueLock.unlock();
+ }
+ return pending;
+ }
+
+ private SchedulerQueryContext takeNextInternal() {
+ long startTimeMs = System.currentTimeMillis();
+ long deadlineEpochMillis = startTimeMs - _queryDeadlineMs;
+
+ _schedulerGroup.trimExpired(deadlineEpochMillis);
+ if (_schedulerGroup.isEmpty() ||
!_resourceManager.canSchedule(_schedulerGroup)) {
+ return null;
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder("SchedulerInfo:");
+ sb.append(_schedulerGroup.toString());
+ ServerQueryRequest queryRequest =
_schedulerGroup.peekFirst().getQueryRequest();
+ sb.append(String.format(" Group: %s: [%d,%d,%d,%d]",
_schedulerGroup.name(),
+ queryRequest.getTimerContext().getQueryArrivalTimeMs(),
queryRequest.getRequestId(),
+ queryRequest.getSegmentsToQuery().size(), startTimeMs));
+ LOGGER.debug(sb.toString());
+ }
+
+ SchedulerQueryContext query = _schedulerGroup.removeFirst();
+ return query;
+ }
+
+ private void checkSchedulerGroupCapacity(SchedulerQueryContext query)
+ throws OutOfCapacityException {
+ if (_schedulerGroup.numPending() >= _maxPendingPerGroup
+ && _schedulerGroup.totalReservedThreads() >=
_resourceManager.getTableThreadsHardLimit()) {
+ throw new OutOfCapacityException(String.format(
+ "SchedulerGroup %s is out of capacity. numPending: %d, maxPending:
%d, reservedThreads: %d "
+ + "threadsHardLimit: %d", _schedulerGroup.name(),
_schedulerGroup.numPending(), _maxPendingPerGroup,
+ _schedulerGroup.totalReservedThreads(),
_resourceManager.getTableThreadsHardLimit()));
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
new file mode 100644
index 0000000000..fb7ffd93e3
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResourceManager for BinaryWorkloadScheduler.
+ */
+public class BinaryWorkloadResourceManager extends ResourceManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
+ private final ResourceLimitPolicy _secondaryWorkloadPolicy;
+
+ public BinaryWorkloadResourceManager(PinotConfiguration config) {
+ super(config);
+ _secondaryWorkloadPolicy = new ResourceLimitPolicy(config,
_numQueryWorkerThreads);
+ }
+
+ /**
+ * Returns an executor service that query executor can use like a dedicated
+ * service for submitting jobs for parallel execution.
+ * @param query
+ * @param accountant Accountant for a scheduler group
+ * @return UnboundedExecutorService for primary workload queries. For
secondary workload queries, returns a
+ * BoundedAccountingExecutor service that limits the number of threads
available for query execution. Query
+ * execution can submit tasks for parallel execution without need
+ * for limiting their parallelism.
+ */
+ @Override
+ public QueryExecutorService getExecutorService(ServerQueryRequest query,
SchedulerGroupAccountant accountant) {
+ if
(!QueryOptionsUtils.isSecondaryWorkload(query.getQueryContext().getQueryOptions()))
{
+ return getPrimaryWorkloadExecutorService();
+ }
+
+ return getSecondaryWorkloadExecutorService(query, accountant);
+ }
+
+ @Override
+ public int getTableThreadsHardLimit() {
+ return _secondaryWorkloadPolicy.getTableThreadsHardLimit();
+ }
+
+ @Override
+ public int getTableThreadsSoftLimit() {
+ return _secondaryWorkloadPolicy.getTableThreadsSoftLimit();
+ }
+
+ private QueryExecutorService getPrimaryWorkloadExecutorService() {
+ return new QueryExecutorService() {
+ @Override
+ public void execute(Runnable command) {
+ _queryWorkers.submit(command);
+ }
+ };
+ }
+
+ private QueryExecutorService
getSecondaryWorkloadExecutorService(ServerQueryRequest query,
+ SchedulerGroupAccountant accountant) {
+ int numSegments = query.getSegmentsToQuery().size();
+ int queryThreadLimit = Math.max(1,
Math.min(_secondaryWorkloadPolicy.getMaxThreadsPerQuery(), numSegments));
+ int spareThreads = _secondaryWorkloadPolicy.getTableThreadsHardLimit() -
accountant.totalReservedThreads();
+ if (spareThreads <= 0) {
+ LOGGER.warn("UNEXPECTED: Attempt to schedule query uses more than the
configured hard limit on threads");
+ spareThreads = 1;
+ } else {
+ spareThreads = Math.min(spareThreads, queryThreadLimit);
+ }
+ Preconditions.checkState(spareThreads >= 1);
+ // We do not bound number of threads here by total available threads. We
can potentially
+ // over-provision number of threads here. That is intentional and
(potentially) good solution.
+ // Queries don't use their workers all the time. So, reserving workers
leads to suboptimal resource
+ // utilization. We want to keep the pipe as full as possible for query
workers. Overprovisioning is one
+ // way to achieve that (in fact, only way for us). There is a
couter-argument to be made that overprovisioning
+ // can impact cache-lines and memory in general.
+ // We use this thread reservation only to determine priority based on
resource utilization and not as a way to
+ // improve system performance (because we don't have good insight on that
yet)
+ accountant.addReservedThreads(spareThreads);
+ // TODO: For 1 thread we should have the query run in the same queryRunner
thread
+ // by supplying an executor service that similar to Guava' directExecutor()
+ return new BoundedAccountingExecutor(_queryWorkers, spareThreads,
accountant);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
index a495c6c5d7..9c4c59ea1e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
@@ -55,6 +55,11 @@ public class QuerySchedulerFactoryTest {
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
assertTrue(queryScheduler instanceof BoundedFCFSScheduler);
+ config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
+ QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
+ assertTrue(queryScheduler instanceof BinaryWorkloadScheduler);
+
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
TestQueryScheduler.class.getName());
queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
assertTrue(queryScheduler instanceof TestQueryScheduler);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2bfc61e7f4..21e5c87ee7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -428,6 +428,14 @@ public class CommonConstants {
// If query submission causes an exception, still continue to submit
the query to other servers
public static final String SKIP_UNAVAILABLE_SERVERS =
"skipUnavailableServers";
+
+ // Indicates that a query belongs to a secondary workload when using
the BinaryWorkloadScheduler. The
+ // BinaryWorkloadScheduler divides queries into two workloads, primary
and secondary. Primary workloads are
+ // executed in an Unbounded FCFS fashion. However, secondary
workloads are executed in a constrainted FCFS
+ // fashion with limited compute.des queries into two workloads,
primary and secondary. Primary workloads are
+ // executed in an Unbounded FCFS fashion. However, secondary
workloads are executed in a constrainted FCFS
+ // fashion with limited compute.
+ public static final String IS_SECONDARY_WORKLOAD =
"isSecondaryWorkload";
}
public static class QueryOptionValue {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]