This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d40731f9729 Throttle SSE & MSE Tasks if Server heap usage is above a
threshold (#16271)
d40731f9729 is described below
commit d40731f9729724eada870b03871295cc12c7e690
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Jul 4 12:00:20 2025 +0530
Throttle SSE & MSE Tasks if Server heap usage is above a threshold (#16271)
---
.../PerQueryCPUMemAccountantFactory.java | 9 ++
.../query/scheduler/BinaryWorkloadScheduler.java | 6 +-
.../query/scheduler/QuerySchedulerFactory.java | 19 ++-
.../query/scheduler/fcfs/BoundedFCFSScheduler.java | 6 +-
.../query/scheduler/fcfs/FCFSQueryScheduler.java | 6 +-
.../resources/BinaryWorkloadResourceManager.java | 6 +-
.../resources/PolicyBasedResourceManager.java | 5 +-
.../query/scheduler/resources/ResourceManager.java | 21 ++-
.../resources/UnboundedResourceManager.java | 5 +-
.../tokenbucket/TokenPriorityScheduler.java | 5 +-
.../accounting/ResourceManagerAccountingTest.java | 2 +-
.../scheduler/MultiLevelPriorityQueueTest.java | 9 +-
.../query/scheduler/PrioritySchedulerTest.java | 3 +-
.../query/scheduler/QuerySchedulerFactoryTest.java | 18 ++-
.../scheduler/resources/ResourceManagerTest.java | 3 +-
.../resources/UnboundedResourceManagerTest.java | 7 +-
.../apache/pinot/query/runtime/QueryRunner.java | 10 +-
.../apache/pinot/query/QueryServerEnclosure.java | 4 +-
.../pinot/server/starter/ServerInstance.java | 9 +-
.../server/starter/helix/BaseServerStarter.java | 17 +--
.../pinot/server/worker/WorkerQueryServer.java | 6 +-
.../accounting/ThreadResourceUsageAccountant.java | 4 +
.../ThrottleOnCriticalHeapUsageExecutor.java | 62 +++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 5 +
.../ThrottleOnCriticalHeapUsageExecutorTest.java | 150 +++++++++++++++++++++
25 files changed, 343 insertions(+), 54 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index aa387807d5b..5f243e03bff 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -249,6 +249,11 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
}
+ @Override
+ public boolean throttleQuerySubmission() {
+ return getWatcherTask().getHeapUsageBytes() >
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
+ }
+
@Override
public boolean isAnchorThreadInterrupted() {
ThreadExecutionContext context =
_threadLocalEntry.get().getCurrentThreadTaskStatus();
@@ -632,6 +637,10 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
return _queryMonitorConfig.get();
}
+ public long getHeapUsageBytes() {
+ return _usedBytes;
+ }
+
@Override
public synchronized void onChange(Set<String> changedConfigs,
Map<String, String> clusterConfigs) {
// Filter configs that have CommonConstants.PREFIX_SCHEDULER_PREFIX
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
index d5c97147201..b4bedb3ff64 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import
org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.slf4j.Logger;
@@ -73,8 +74,9 @@ public class BinaryWorkloadScheduler extends QueryScheduler {
Thread _scheduler;
public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics metrics,
- LongAccumulator latestQueryTime) {
- super(config, queryExecutor, new BinaryWorkloadResourceManager(config),
metrics, latestQueryTime);
+ LongAccumulator latestQueryTime, ThreadResourceUsageAccountant
resourceUsageAccountant) {
+ super(config, queryExecutor, new BinaryWorkloadResourceManager(config,
resourceUsageAccountant), metrics,
+ latestQueryTime);
_secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager);
_numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES,
DEFAULT_MAX_SECONDARY_QUERIES);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
index 69dd8bed365..e7225598732 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
import
org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
import org.slf4j.Logger;
@@ -58,7 +59,8 @@ public class QuerySchedulerFactory {
* @return returns an instance of query scheduler
*/
public static QueryScheduler create(PinotConfiguration schedulerConfig,
QueryExecutor queryExecutor,
- ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
+ ServerMetrics serverMetrics, LongAccumulator latestQueryTime,
+ ThreadResourceUsageAccountant resourceUsageAccountant) {
Preconditions.checkNotNull(schedulerConfig);
Preconditions.checkNotNull(queryExecutor);
@@ -66,16 +68,20 @@ public class QuerySchedulerFactory {
QueryScheduler scheduler;
switch (schedulerName.toLowerCase()) {
case FCFS_ALGORITHM:
- scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor,
serverMetrics, latestQueryTime);
+ scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor,
serverMetrics, latestQueryTime,
+ resourceUsageAccountant);
break;
case TOKEN_BUCKET_ALGORITHM:
- scheduler = TokenPriorityScheduler.create(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime);
+ scheduler = TokenPriorityScheduler.create(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime,
+ resourceUsageAccountant);
break;
case BOUNDED_FCFS_ALGORITHM:
- scheduler = BoundedFCFSScheduler.create(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime);
+ scheduler = BoundedFCFSScheduler.create(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime,
+ resourceUsageAccountant);
break;
case BINARY_WORKLOAD_ALGORITHM:
- scheduler = new BinaryWorkloadScheduler(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime);
+ scheduler = new BinaryWorkloadScheduler(schedulerConfig,
queryExecutor, serverMetrics, latestQueryTime,
+ resourceUsageAccountant);
break;
default:
scheduler =
@@ -93,7 +99,8 @@ public class QuerySchedulerFactory {
// Failure on bad configuration will cause outage vs an inferior algorithm
that
// will provide degraded service
LOGGER.warn("Scheduler {} not found. Using default FCFS query scheduler",
schedulerName);
- return new FCFSQueryScheduler(schedulerConfig, queryExecutor,
serverMetrics, latestQueryTime);
+ return new FCFSQueryScheduler(schedulerConfig, queryExecutor,
serverMetrics, latestQueryTime,
+ resourceUsageAccountant);
}
@Nullable
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index 60495ad2f90..30f273e892b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -29,6 +29,7 @@ import
org.apache.pinot.core.query.scheduler.SchedulerPriorityQueue;
import org.apache.pinot.core.query.scheduler.TableBasedGroupMapper;
import
org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -39,8 +40,9 @@ import org.apache.pinot.spi.env.PinotConfiguration;
*/
public class BoundedFCFSScheduler extends PriorityScheduler {
public static BoundedFCFSScheduler create(PinotConfiguration config,
QueryExecutor queryExecutor,
- ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
- final ResourceManager rm = new PolicyBasedResourceManager(config);
+ ServerMetrics serverMetrics, LongAccumulator latestQueryTime,
+ ThreadResourceUsageAccountant resourceUsageAccountant) {
+ final ResourceManager rm = new PolicyBasedResourceManager(config,
resourceUsageAccountant);
final SchedulerGroupFactory groupFactory = new SchedulerGroupFactory() {
@Override
public SchedulerGroup create(PinotConfiguration config, String
groupName) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index d61972d347a..035b795f737 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;
@@ -41,8 +42,9 @@ import org.apache.pinot.spi.query.QueryThreadContext;
public class FCFSQueryScheduler extends QueryScheduler {
public FCFSQueryScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics serverMetrics,
- LongAccumulator latestQueryTime) {
- super(config, queryExecutor, new UnboundedResourceManager(config),
serverMetrics, latestQueryTime);
+ LongAccumulator latestQueryTime, ThreadResourceUsageAccountant
resourceUsageAccountant) {
+ super(config, queryExecutor, new UnboundedResourceManager(config,
resourceUsageAccountant), serverMetrics,
+ latestQueryTime);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
index fb7ffd93e3f..feab2930bdb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +35,9 @@ public class BinaryWorkloadResourceManager extends
ResourceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
private final ResourceLimitPolicy _secondaryWorkloadPolicy;
- public BinaryWorkloadResourceManager(PinotConfiguration config) {
- super(config);
+ public BinaryWorkloadResourceManager(PinotConfiguration config,
+ ThreadResourceUsageAccountant resourceUsageAccountant) {
+ super(config, resourceUsageAccountant);
_secondaryWorkloadPolicy = new ResourceLimitPolicy(config,
_numQueryWorkerThreads);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
index dd2c942c565..1fa16063469 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.scheduler.resources;
import com.google.common.base.Preconditions;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +35,8 @@ public class PolicyBasedResourceManager extends
ResourceManager {
private final ResourceLimitPolicy _resourcePolicy;
- public PolicyBasedResourceManager(PinotConfiguration config) {
- super(config);
+ public PolicyBasedResourceManager(PinotConfiguration config,
ThreadResourceUsageAccountant resourceUsageAccountant) {
+ super(config, resourceUsageAccountant);
_resourcePolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index 9414dc11edd..a705b735459 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -27,7 +27,9 @@ import java.util.concurrent.ThreadFactory;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +78,7 @@ public abstract class ResourceManager {
/**
* @param config configuration for initializing resource manager
*/
- public ResourceManager(PinotConfiguration config) {
+ public ResourceManager(PinotConfiguration config,
ThreadResourceUsageAccountant resourceUsageAccountant) {
_numQueryRunnerThreads = config.getProperty(QUERY_RUNNER_CONFIG_KEY,
DEFAULT_QUERY_RUNNER_THREADS);
_numQueryWorkerThreads = config.getProperty(QUERY_WORKER_CONFIG_KEY,
DEFAULT_QUERY_WORKER_THREADS);
@@ -85,14 +87,23 @@ public abstract class ResourceManager {
// pqr -> pinot query runner (to give short names)
ThreadFactory queryRunnerFactory = new
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
- _queryRunners =
-
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads,
queryRunnerFactory));
+
+ ExecutorService runnerService =
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+ if
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
{
+ runnerService = new ThrottleOnCriticalHeapUsageExecutor(runnerService,
resourceUsageAccountant);
+ }
+ _queryRunners = MoreExecutors.listeningDecorator(runnerService);
// pqw -> pinot query workers
ThreadFactory queryWorkersFactory = new
TracedThreadFactory(Thread.NORM_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
- _queryWorkers =
-
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads,
queryWorkersFactory));
+ ExecutorService workerService =
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+ if
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
{
+ workerService = new ThrottleOnCriticalHeapUsageExecutor(workerService,
resourceUsageAccountant);
+ }
+ _queryWorkers = MoreExecutors.listeningDecorator(workerService);
}
public void stop() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
index 075e35dd021..ff0c2eb2835 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.scheduler.resources;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -30,8 +31,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
*/
public class UnboundedResourceManager extends ResourceManager {
- public UnboundedResourceManager(PinotConfiguration config) {
- super(config);
+ public UnboundedResourceManager(PinotConfiguration config,
ThreadResourceUsageAccountant resourceUsageAccountant) {
+ super(config, resourceUsageAccountant);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
index cac4c213131..25f02c6de89 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
@@ -28,6 +28,7 @@ import
org.apache.pinot.core.query.scheduler.SchedulerGroupFactory;
import org.apache.pinot.core.query.scheduler.TableBasedGroupMapper;
import
org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -42,8 +43,8 @@ public class TokenPriorityScheduler extends PriorityScheduler
{
private static final int DEFAULT_TOKEN_LIFETIME_MS = 100;
public static TokenPriorityScheduler create(PinotConfiguration config,
QueryExecutor queryExecutor,
- ServerMetrics metrics, LongAccumulator latestQueryTime) {
- final ResourceManager rm = new PolicyBasedResourceManager(config);
+ ServerMetrics metrics, LongAccumulator latestQueryTime,
ThreadResourceUsageAccountant resourceUsageAccountant) {
+ final ResourceManager rm = new PolicyBasedResourceManager(config,
resourceUsageAccountant);
final SchedulerGroupFactory groupFactory = new SchedulerGroupFactory() {
@Override
public SchedulerGroup create(PinotConfiguration config, String
groupName) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 7ffa5f342d1..c2e2f0cef3c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -550,7 +550,7 @@ public class ResourceManagerAccountingTest {
private ResourceManager getResourceManager(int runners, int workers, final
int softLimit, final int hardLimit,
Map<String, Object> map) {
- return new ResourceManager(getConfig(runners, workers, map)) {
+ return new ResourceManager(getConfig(runners, workers, map), new
Tracing.DefaultThreadResourceUsageAccountant()) {
@Override
public QueryExecutorService getExecutorService(ServerQueryRequest query,
SchedulerGroupAccountant accountant) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
index 2006aa7992f..84ab91ed92d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
@@ -33,6 +33,7 @@ import
org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -81,7 +82,8 @@ public class MultiLevelPriorityQueueTest {
PinotConfiguration configuration = new PinotConfiguration(properties);
- ResourceManager rm = new UnboundedResourceManager(configuration);
+ ResourceManager rm =
+ new UnboundedResourceManager(configuration, new
Tracing.DefaultThreadResourceUsageAccountant());
MultiLevelPriorityQueue queue = createQueue(configuration, rm);
queue.put(createQueryRequest(GROUP_ONE, METRICS));
GROUP_FACTORY._groupMap.get(GROUP_ONE).addReservedThreads(rm.getTableThreadsHardLimit());
@@ -129,7 +131,8 @@ public class MultiLevelPriorityQueueTest {
PinotConfiguration configuration = new PinotConfiguration(properties);
- PolicyBasedResourceManager rm = new
PolicyBasedResourceManager(configuration);
+ PolicyBasedResourceManager rm =
+ new PolicyBasedResourceManager(configuration, new
Tracing.DefaultThreadResourceUsageAccountant());
MultiLevelPriorityQueue queue = createQueue(configuration, rm);
queue.put(createQueryRequest(GROUP_ONE, METRICS));
@@ -214,7 +217,7 @@ public class MultiLevelPriorityQueueTest {
private MultiLevelPriorityQueue createQueue() {
PinotConfiguration conf = new PinotConfiguration();
- return createQueue(conf, new UnboundedResourceManager(conf));
+ return createQueue(conf, new UnboundedResourceManager(conf, new
Tracing.DefaultThreadResourceUsageAccountant()));
}
private MultiLevelPriorityQueue createQueue(PinotConfiguration config,
ResourceManager rm) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 0e7a4d67476..94d91dd80e6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -52,6 +52,7 @@ import
org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -241,7 +242,7 @@ public class PrioritySchedulerTest {
}
public static TestPriorityScheduler create(PinotConfiguration config) {
- ResourceManager rm = new PolicyBasedResourceManager(config);
+ ResourceManager rm = new PolicyBasedResourceManager(config, new
Tracing.DefaultThreadResourceUsageAccountant());
QueryExecutor qe = new TestQueryExecutor();
_groupFactory = new TestSchedulerGroupFactory();
MultiLevelPriorityQueue queue =
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
index 9c4c59ea1ef..b28bf4ac8ec 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
@@ -27,7 +27,9 @@ import
org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
import
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import
org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -43,25 +45,27 @@ public class QuerySchedulerFactoryTest {
LongAccumulator latestQueryTime = mock(LongAccumulator.class);
PinotConfiguration config = new PinotConfiguration();
+ ThreadResourceUsageAccountant accountant = new
Tracing.DefaultThreadResourceUsageAccountant();
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
QuerySchedulerFactory.FCFS_ALGORITHM);
- QueryScheduler queryScheduler = QuerySchedulerFactory.create(config,
queryExecutor, serverMetrics, latestQueryTime);
+ QueryScheduler queryScheduler =
+ QuerySchedulerFactory.create(config, queryExecutor, serverMetrics,
latestQueryTime, accountant);
assertTrue(queryScheduler instanceof FCFSQueryScheduler);
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
QuerySchedulerFactory.TOKEN_BUCKET_ALGORITHM);
- queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof TokenPriorityScheduler);
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
QuerySchedulerFactory.BOUNDED_FCFS_ALGORITHM);
- queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof BoundedFCFSScheduler);
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM);
- queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof BinaryWorkloadScheduler);
config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
TestQueryScheduler.class.getName());
- queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime);
+ queryScheduler = QuerySchedulerFactory.create(config, queryExecutor,
serverMetrics, latestQueryTime, accountant);
assertTrue(queryScheduler instanceof TestQueryScheduler);
}
@@ -69,7 +73,9 @@ public class QuerySchedulerFactoryTest {
public TestQueryScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics serverMetrics,
LongAccumulator latestQueryTime) {
- super(config, queryExecutor, new UnboundedResourceManager(config),
serverMetrics, latestQueryTime);
+ super(config, queryExecutor,
+ new UnboundedResourceManager(config, new
Tracing.DefaultThreadResourceUsageAccountant()), serverMetrics,
+ latestQueryTime);
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
index 6e6b01cf431..23f7b9f7b1f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -48,7 +49,7 @@ public class ResourceManagerTest {
private ResourceManager getResourceManager(int runners, int workers, final
int softLimit, final int hardLimit) {
- return new ResourceManager(getConfig(runners, workers)) {
+ return new ResourceManager(getConfig(runners, workers), new
Tracing.DefaultThreadResourceUsageAccountant()) {
@Override
public QueryExecutorService getExecutorService(ServerQueryRequest query,
SchedulerGroupAccountant accountant) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
index 7031a8ea227..0baeec19e52 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -35,7 +36,8 @@ public class UnboundedResourceManagerTest {
@Test
public void testDefault() {
- UnboundedResourceManager rm = new UnboundedResourceManager(new
PinotConfiguration());
+ UnboundedResourceManager rm =
+ new UnboundedResourceManager(new PinotConfiguration(), new
Tracing.DefaultThreadResourceUsageAccountant());
assertTrue(rm.getNumQueryRunnerThreads() > 1);
assertTrue(rm.getNumQueryWorkerThreads() >= 1);
assertEquals(rm.getTableThreadsHardLimit(), rm.getNumQueryRunnerThreads()
+ rm.getNumQueryWorkerThreads());
@@ -51,7 +53,8 @@ public class UnboundedResourceManagerTest {
properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, runners);
properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, workers);
- UnboundedResourceManager rm = new UnboundedResourceManager(new
PinotConfiguration(properties));
+ UnboundedResourceManager rm = new UnboundedResourceManager(new
PinotConfiguration(properties),
+ new Tracing.DefaultThreadResourceUsageAccountant());
assertEquals(rm.getNumQueryWorkerThreads(), workers);
assertEquals(rm.getNumQueryRunnerThreads(), runners);
assertEquals(rm.getTableThreadsHardLimit(), runners + workers);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d98f919f29e..054a80ef14d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -75,10 +75,12 @@ import
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesServerPlanVis
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.executor.HardLimitExecutor;
import org.apache.pinot.spi.executor.MetricsExecutor;
+import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -144,7 +146,7 @@ public class QueryRunner {
* <p>Should be called only once and before calling any other method.
*/
public void init(PinotConfiguration serverConf, InstanceDataManager
instanceDataManager,
- @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats) {
+ @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats,
ThreadResourceUsageAccountant resourceUsageAccountant) {
String hostname =
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
hostname =
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -208,6 +210,12 @@ public class QueryRunner {
_executorService = new HardLimitExecutor(hardLimit, _executorService);
}
+ if
(serverConf.getProperty(Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+ Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE)) {
+ LOGGER.info("Enable OOM Throttling on critical heap usage for
multi-stage executor");
+ _executorService = new
ThrottleOnCriticalHeapUsageExecutor(_executorService, resourceUsageAccountant);
+ }
+
_opChainScheduler = new OpChainSchedulerService(_executorService,
serverConf);
_mailboxService = new MailboxService(hostname, port, serverConf,
tlsConfig);
try {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 43e99f1c0cb..5df708551be 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -30,6 +30,7 @@ import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -61,7 +62,8 @@ public class QueryServerEnclosure {
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
_queryRunnerPort);
InstanceDataManager instanceDataManager =
factory.buildInstanceDataManager();
_queryRunner = new QueryRunner();
- _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, null, () -> true);
+ _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, null, () -> true,
+ new Tracing.DefaultThreadResourceUsageAccountant());
}
public int getPort() {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 2ba34b61bdc..516fb720fe1 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -51,6 +51,7 @@ import org.apache.pinot.server.access.AllowAllAccessFactory;
import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.starter.helix.SendStatsPredicate;
import org.apache.pinot.server.worker.WorkerQueryServer;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
@@ -85,7 +86,8 @@ public class ServerInstance {
private boolean _queryServerStarted = false;
public ServerInstance(ServerConf serverConf, HelixManager helixManager,
AccessControlFactory accessControlFactory,
- @Nullable SegmentOperationsThrottler segmentOperationsThrottler,
SendStatsPredicate sendStatsPredicate)
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler,
SendStatsPredicate sendStatsPredicate,
+ ThreadResourceUsageAccountant resourceUsageAccountant)
throws Exception {
LOGGER.info("Initializing server instance");
_helixManager = helixManager;
@@ -122,7 +124,8 @@ public class ServerInstance {
LOGGER.info("Initializing query scheduler");
_latestQueryTime = new LongAccumulator(Long::max, 0);
_queryScheduler =
- QuerySchedulerFactory.create(serverConf.getSchedulerConfig(),
_queryExecutor, _serverMetrics, _latestQueryTime);
+ QuerySchedulerFactory.create(serverConf.getSchedulerConfig(),
_queryExecutor, _serverMetrics, _latestQueryTime,
+ resourceUsageAccountant);
TlsConfig tlsConfig =
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_TLS_PREFIX);
@@ -135,7 +138,7 @@ public class ServerInstance {
if (serverConf.isMultiStageServerEnabled()) {
LOGGER.info("Initializing Multi-stage query engine");
_workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(),
_instanceDataManager,
- serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null,
sendStatsPredicate);
+ serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null,
sendStatsPredicate, resourceUsageAccountant);
} else {
_workerQueryServer = null;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index fa8b8fa6bc9..044d33bf7cb 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -663,14 +663,6 @@ public abstract class BaseServerStarter implements
ServiceStartable {
segmentDownloadThrottler,
segmentMultiColTextIndexPreprocessThrottler);
}
- SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
- ServerConf serverConf = new ServerConf(_serverConf);
- _serverInstance = new ServerInstance(serverConf, _helixManager,
_accessControlFactory, _segmentOperationsThrottler,
- sendStatsPredicate);
- ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
-
- InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
- instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
// initialize the thread accountant for query killing
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
_instanceId,
@@ -680,6 +672,15 @@ public abstract class BaseServerStarter implements
ServiceStartable {
Tracing.getThreadAccountant().getClusterConfigChangeListener());
}
+ SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
+ ServerConf serverConf = new ServerConf(_serverConf);
+ _serverInstance = new ServerInstance(serverConf, _helixManager,
_accessControlFactory, _segmentOperationsThrottler,
+ sendStatsPredicate, Tracing.getThreadAccountant());
+ ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+
+ InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
+ instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
+
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index f8372d9960c..9528ec79af0 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -24,6 +24,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.service.server.QueryServer;
import org.apache.pinot.server.starter.helix.SendStatsPredicate;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
@@ -34,13 +35,14 @@ public class WorkerQueryServer {
private final QueryServer _queryWorkerService;
public WorkerQueryServer(PinotConfiguration serverConf, InstanceDataManager
instanceDataManager,
- @Nullable TlsConfig tlsConfig, SendStatsPredicate sendStats) {
+ @Nullable TlsConfig tlsConfig, SendStatsPredicate sendStats,
+ ThreadResourceUsageAccountant resourceUsageAccountant) {
serverConf = toWorkerQueryConfig(serverConf);
String instanceId =
serverConf.getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
_queryServicePort =
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
QueryRunner queryRunner = new QueryRunner();
- queryRunner.init(serverConf, instanceDataManager, tlsConfig,
sendStats::isSendStats);
+ queryRunner.init(serverConf, instanceDataManager, tlsConfig,
sendStats::isSendStats, resourceUsageAccountant);
_queryWorkerService = new QueryServer(instanceId, _queryServicePort,
queryRunner, tlsConfig, serverConf);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index b3a270aea1e..1d6d590e0fa 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -84,6 +84,10 @@ public interface ThreadResourceUsageAccountant {
*/
void sampleUsageMSE();
+ default boolean throttleQuerySubmission() {
+ return false;
+ }
+
/**
* special interface to aggregate usage to the stats store only once, it is
used for response
* ser/de threads where the thread execution context cannot be setup before
hands as
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
new file mode 100644
index 00000000000..091fb5e3fe0
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+
+
+/**
+ * An Executor that throttles task submission when the heap usage is critical.
+ * Heap Usage level is obtained from {@link
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
+ ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+ super(executorService);
+ _threadResourceUsageAccountant = threadResourceUsageAccountant;
+ }
+
+ protected void checkTaskAllowed() {
+ if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
+ throw new IllegalStateException("Tasks throttled due to high heap
usage.");
+ }
+ }
+
+ @Override
+ protected <T> Callable<T> decorate(Callable<T> task) {
+ checkTaskAllowed();
+ return () -> {
+ checkTaskAllowed();
+ return task.call();
+ };
+ }
+
+ @Override
+ protected Runnable decorate(Runnable task) {
+ checkTaskAllowed();
+ return () -> {
+ checkTaskAllowed();
+ task.run();
+ };
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3a2b703edc1..b8a4d16152e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1099,6 +1099,11 @@ public class CommonConstants {
@Deprecated
public static final String DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR =
DEFAULT_MULTISTAGE_EXECUTOR_TYPE;
+ // Enable SSE & MSE task throttling on critical heap usage.
+ public static final String
CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE =
+ QUERY_EXECUTOR_CONFIG_PREFIX + ".enableThrottlingOnHeapUsage";
+ public static final boolean
DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE = false;
+
/**
* The ExecutorServiceProvider to be used for timeseries threads.
*
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
new file mode 100644
index 00000000000..82bc2e808df
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+
+public class ThrottleOnCriticalHeapUsageExecutorTest {
+ @Test
+ void testThrottle() throws Exception {
+ ThreadResourceUsageAccountant accountant = new
ThreadResourceUsageAccountant() {
+ final AtomicLong _numCalls = new AtomicLong(0);
+ @Override
+ public void clear() {
+ }
+
+ @Override
+ public boolean isAnchorThreadInterrupted() {
+ return false;
+ }
+
+ @Override
+ public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ @Nullable ThreadExecutionContext parentContext) {
+ }
+
+ @Override
+ public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
+ }
+
+ @Override
+ public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
+ @Nullable ThreadExecutionContext parentContext) {
+ }
+
+ @Nullable
+ @Override
+ public ThreadExecutionContext getThreadExecutionContext() {
+ return null;
+ }
+
+ @Override
+ public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
+ }
+
+ @Override
+ public void sampleUsage() {
+ }
+
+ @Override
+ public void sampleUsageMSE() {
+ }
+
+ @Override
+ public boolean throttleQuerySubmission() {
+ return _numCalls.getAndIncrement() > 1;
+ }
+
+ @Override
+ public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes) {
+ }
+
+ @Override
+ public void updateQueryUsageConcurrently(String queryId) {
+ }
+
+ @Override
+ public void startWatcherTask() {
+ }
+
+ @Override
+ public Exception getErrorStatus() {
+ return null;
+ }
+
+ @Override
+ public Collection<? extends ThreadResourceTracker> getThreadResources() {
+ return List.of();
+ }
+
+ @Override
+ public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+ return Map.of();
+ }
+ };
+
+ ThrottleOnCriticalHeapUsageExecutor executor = new
ThrottleOnCriticalHeapUsageExecutor(
+ Executors.newCachedThreadPool(), accountant);
+
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ try {
+ executor.execute(() -> {
+ try {
+ barrier.await();
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // do nothing
+ }
+ });
+
+ barrier.await();
+ try {
+ executor.execute(() -> {
+ // do nothing
+ });
+ fail("Should not allow more than 1 task");
+ } catch (Exception e) {
+ // as expected
+ assertEquals(e.getMessage(), "Tasks throttled due to high heap
usage.");
+ }
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]