This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d40731f9729 Throttle SSE & MSE Tasks if Server heap usage is above a 
threshold (#16271)
d40731f9729 is described below

commit d40731f9729724eada870b03871295cc12c7e690
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Jul 4 12:00:20 2025 +0530

    Throttle SSE & MSE Tasks if Server heap usage is above a threshold (#16271)
---
 .../PerQueryCPUMemAccountantFactory.java           |   9 ++
 .../query/scheduler/BinaryWorkloadScheduler.java   |   6 +-
 .../query/scheduler/QuerySchedulerFactory.java     |  19 ++-
 .../query/scheduler/fcfs/BoundedFCFSScheduler.java |   6 +-
 .../query/scheduler/fcfs/FCFSQueryScheduler.java   |   6 +-
 .../resources/BinaryWorkloadResourceManager.java   |   6 +-
 .../resources/PolicyBasedResourceManager.java      |   5 +-
 .../query/scheduler/resources/ResourceManager.java |  21 ++-
 .../resources/UnboundedResourceManager.java        |   5 +-
 .../tokenbucket/TokenPriorityScheduler.java        |   5 +-
 .../accounting/ResourceManagerAccountingTest.java  |   2 +-
 .../scheduler/MultiLevelPriorityQueueTest.java     |   9 +-
 .../query/scheduler/PrioritySchedulerTest.java     |   3 +-
 .../query/scheduler/QuerySchedulerFactoryTest.java |  18 ++-
 .../scheduler/resources/ResourceManagerTest.java   |   3 +-
 .../resources/UnboundedResourceManagerTest.java    |   7 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  10 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   4 +-
 .../pinot/server/starter/ServerInstance.java       |   9 +-
 .../server/starter/helix/BaseServerStarter.java    |  17 +--
 .../pinot/server/worker/WorkerQueryServer.java     |   6 +-
 .../accounting/ThreadResourceUsageAccountant.java  |   4 +
 .../ThrottleOnCriticalHeapUsageExecutor.java       |  62 +++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   5 +
 .../ThrottleOnCriticalHeapUsageExecutorTest.java   | 150 +++++++++++++++++++++
 25 files changed, 343 insertions(+), 54 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index aa387807d5b..5f243e03bff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -249,6 +249,11 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       }
     }
 
+    @Override
+    public boolean throttleQuerySubmission() {
+      return getWatcherTask().getHeapUsageBytes() > 
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
+    }
+
     @Override
     public boolean isAnchorThreadInterrupted() {
       ThreadExecutionContext context = 
_threadLocalEntry.get().getCurrentThreadTaskStatus();
@@ -632,6 +637,10 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         return _queryMonitorConfig.get();
       }
 
+      public long getHeapUsageBytes() {
+        return _usedBytes;
+      }
+
       @Override
       public synchronized void onChange(Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
         // Filter configs that have CommonConstants.PREFIX_SCHEDULER_PREFIX
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
index d5c97147201..b4bedb3ff64 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import 
org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager;
 import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.slf4j.Logger;
@@ -73,8 +74,9 @@ public class BinaryWorkloadScheduler extends QueryScheduler {
   Thread _scheduler;
 
   public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics metrics,
-      LongAccumulator latestQueryTime) {
-    super(config, queryExecutor, new BinaryWorkloadResourceManager(config), 
metrics, latestQueryTime);
+      LongAccumulator latestQueryTime, ThreadResourceUsageAccountant 
resourceUsageAccountant) {
+    super(config, queryExecutor, new BinaryWorkloadResourceManager(config, 
resourceUsageAccountant), metrics,
+        latestQueryTime);
 
     _secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager);
     _numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES, 
DEFAULT_MAX_SECONDARY_QUERIES);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
index 69dd8bed365..e7225598732 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
 import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
 import 
org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.slf4j.Logger;
@@ -58,7 +59,8 @@ public class QuerySchedulerFactory {
    * @return returns an instance of query scheduler
    */
   public static QueryScheduler create(PinotConfiguration schedulerConfig, 
QueryExecutor queryExecutor,
-      ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
+      ServerMetrics serverMetrics, LongAccumulator latestQueryTime,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
     Preconditions.checkNotNull(schedulerConfig);
     Preconditions.checkNotNull(queryExecutor);
 
@@ -66,16 +68,20 @@ public class QuerySchedulerFactory {
     QueryScheduler scheduler;
     switch (schedulerName.toLowerCase()) {
       case FCFS_ALGORITHM:
-        scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor, 
serverMetrics, latestQueryTime);
+        scheduler = new FCFSQueryScheduler(schedulerConfig, queryExecutor, 
serverMetrics, latestQueryTime,
+            resourceUsageAccountant);
         break;
       case TOKEN_BUCKET_ALGORITHM:
-        scheduler = TokenPriorityScheduler.create(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime);
+        scheduler = TokenPriorityScheduler.create(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime,
+            resourceUsageAccountant);
         break;
       case BOUNDED_FCFS_ALGORITHM:
-        scheduler = BoundedFCFSScheduler.create(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime);
+        scheduler = BoundedFCFSScheduler.create(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime,
+            resourceUsageAccountant);
         break;
       case BINARY_WORKLOAD_ALGORITHM:
-        scheduler = new BinaryWorkloadScheduler(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime);
+        scheduler = new BinaryWorkloadScheduler(schedulerConfig, 
queryExecutor, serverMetrics, latestQueryTime,
+            resourceUsageAccountant);
         break;
       default:
         scheduler =
@@ -93,7 +99,8 @@ public class QuerySchedulerFactory {
     // Failure on bad configuration will cause outage vs an inferior algorithm 
that
     // will provide degraded service
     LOGGER.warn("Scheduler {} not found. Using default FCFS query scheduler", 
schedulerName);
-    return new FCFSQueryScheduler(schedulerConfig, queryExecutor, 
serverMetrics, latestQueryTime);
+    return new FCFSQueryScheduler(schedulerConfig, queryExecutor, 
serverMetrics, latestQueryTime,
+        resourceUsageAccountant);
   }
 
   @Nullable
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index 60495ad2f90..30f273e892b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -29,6 +29,7 @@ import 
org.apache.pinot.core.query.scheduler.SchedulerPriorityQueue;
 import org.apache.pinot.core.query.scheduler.TableBasedGroupMapper;
 import 
org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -39,8 +40,9 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  */
 public class BoundedFCFSScheduler extends PriorityScheduler {
   public static BoundedFCFSScheduler create(PinotConfiguration config, 
QueryExecutor queryExecutor,
-      ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
-    final ResourceManager rm = new PolicyBasedResourceManager(config);
+      ServerMetrics serverMetrics, LongAccumulator latestQueryTime,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    final ResourceManager rm = new PolicyBasedResourceManager(config, 
resourceUsageAccountant);
     final SchedulerGroupFactory groupFactory = new SchedulerGroupFactory() {
       @Override
       public SchedulerGroup create(PinotConfiguration config, String 
groupName) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index d61972d347a..035b795f737 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
 import 
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.query.QueryThreadContext;
 
@@ -41,8 +42,9 @@ import org.apache.pinot.spi.query.QueryThreadContext;
 public class FCFSQueryScheduler extends QueryScheduler {
 
   public FCFSQueryScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics serverMetrics,
-      LongAccumulator latestQueryTime) {
-    super(config, queryExecutor, new UnboundedResourceManager(config), 
serverMetrics, latestQueryTime);
+      LongAccumulator latestQueryTime, ThreadResourceUsageAccountant 
resourceUsageAccountant) {
+    super(config, queryExecutor, new UnboundedResourceManager(config, 
resourceUsageAccountant), serverMetrics,
+        latestQueryTime);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
index fb7ffd93e3f..feab2930bdb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +35,9 @@ public class BinaryWorkloadResourceManager extends 
ResourceManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
   private final ResourceLimitPolicy _secondaryWorkloadPolicy;
 
-  public BinaryWorkloadResourceManager(PinotConfiguration config) {
-    super(config);
+  public BinaryWorkloadResourceManager(PinotConfiguration config,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    super(config, resourceUsageAccountant);
     _secondaryWorkloadPolicy = new ResourceLimitPolicy(config, 
_numQueryWorkerThreads);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
index dd2c942c565..1fa16063469 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.scheduler.resources;
 import com.google.common.base.Preconditions;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +35,8 @@ public class PolicyBasedResourceManager extends 
ResourceManager {
 
   private final ResourceLimitPolicy _resourcePolicy;
 
-  public PolicyBasedResourceManager(PinotConfiguration config) {
-    super(config);
+  public PolicyBasedResourceManager(PinotConfiguration config, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
+    super(config, resourceUsageAccountant);
     _resourcePolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index 9414dc11edd..a705b735459 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -27,7 +27,9 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.core.util.trace.TracedThreadFactory;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +78,7 @@ public abstract class ResourceManager {
   /**
    * @param config configuration for initializing resource manager
    */
-  public ResourceManager(PinotConfiguration config) {
+  public ResourceManager(PinotConfiguration config, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
     _numQueryRunnerThreads = config.getProperty(QUERY_RUNNER_CONFIG_KEY, 
DEFAULT_QUERY_RUNNER_THREADS);
     _numQueryWorkerThreads = config.getProperty(QUERY_WORKER_CONFIG_KEY, 
DEFAULT_QUERY_WORKER_THREADS);
 
@@ -85,14 +87,23 @@ public abstract class ResourceManager {
     // pqr -> pinot query runner (to give short names)
     ThreadFactory queryRunnerFactory = new 
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
-    _queryRunners =
-        
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads,
 queryRunnerFactory));
+
+    ExecutorService runnerService = 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+    if 
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+        
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
 {
+      runnerService = new ThrottleOnCriticalHeapUsageExecutor(runnerService, 
resourceUsageAccountant);
+    }
+    _queryRunners = MoreExecutors.listeningDecorator(runnerService);
 
     // pqw -> pinot query workers
     ThreadFactory queryWorkersFactory = new 
TracedThreadFactory(Thread.NORM_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
-    _queryWorkers =
-        
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads,
 queryWorkersFactory));
+    ExecutorService workerService = 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+    if 
(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+        
CommonConstants.Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE))
 {
+      workerService = new ThrottleOnCriticalHeapUsageExecutor(workerService, 
resourceUsageAccountant);
+    }
+    _queryWorkers = MoreExecutors.listeningDecorator(workerService);
   }
 
   public void stop() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
index 075e35dd021..ff0c2eb2835 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.scheduler.resources;
 
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -30,8 +31,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  */
 public class UnboundedResourceManager extends ResourceManager {
 
-  public UnboundedResourceManager(PinotConfiguration config) {
-    super(config);
+  public UnboundedResourceManager(PinotConfiguration config, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
+    super(config, resourceUsageAccountant);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
index cac4c213131..25f02c6de89 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
@@ -28,6 +28,7 @@ import 
org.apache.pinot.core.query.scheduler.SchedulerGroupFactory;
 import org.apache.pinot.core.query.scheduler.TableBasedGroupMapper;
 import 
org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -42,8 +43,8 @@ public class TokenPriorityScheduler extends PriorityScheduler 
{
   private static final int DEFAULT_TOKEN_LIFETIME_MS = 100;
 
   public static TokenPriorityScheduler create(PinotConfiguration config, 
QueryExecutor queryExecutor,
-      ServerMetrics metrics, LongAccumulator latestQueryTime) {
-    final ResourceManager rm = new PolicyBasedResourceManager(config);
+      ServerMetrics metrics, LongAccumulator latestQueryTime, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
+    final ResourceManager rm = new PolicyBasedResourceManager(config, 
resourceUsageAccountant);
     final SchedulerGroupFactory groupFactory = new SchedulerGroupFactory() {
       @Override
       public SchedulerGroup create(PinotConfiguration config, String 
groupName) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 7ffa5f342d1..c2e2f0cef3c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -550,7 +550,7 @@ public class ResourceManagerAccountingTest {
   private ResourceManager getResourceManager(int runners, int workers, final 
int softLimit, final int hardLimit,
       Map<String, Object> map) {
 
-    return new ResourceManager(getConfig(runners, workers, map)) {
+    return new ResourceManager(getConfig(runners, workers, map), new 
Tracing.DefaultThreadResourceUsageAccountant()) {
 
       @Override
       public QueryExecutorService getExecutorService(ServerQueryRequest query, 
SchedulerGroupAccountant accountant) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
index 2006aa7992f..84ab91ed92d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import 
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.trace.Tracing;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -81,7 +82,8 @@ public class MultiLevelPriorityQueueTest {
 
     PinotConfiguration configuration = new PinotConfiguration(properties);
 
-    ResourceManager rm = new UnboundedResourceManager(configuration);
+    ResourceManager rm =
+        new UnboundedResourceManager(configuration, new 
Tracing.DefaultThreadResourceUsageAccountant());
     MultiLevelPriorityQueue queue = createQueue(configuration, rm);
     queue.put(createQueryRequest(GROUP_ONE, METRICS));
     
GROUP_FACTORY._groupMap.get(GROUP_ONE).addReservedThreads(rm.getTableThreadsHardLimit());
@@ -129,7 +131,8 @@ public class MultiLevelPriorityQueueTest {
 
     PinotConfiguration configuration = new PinotConfiguration(properties);
 
-    PolicyBasedResourceManager rm = new 
PolicyBasedResourceManager(configuration);
+    PolicyBasedResourceManager rm =
+        new PolicyBasedResourceManager(configuration, new 
Tracing.DefaultThreadResourceUsageAccountant());
     MultiLevelPriorityQueue queue = createQueue(configuration, rm);
 
     queue.put(createQueryRequest(GROUP_ONE, METRICS));
@@ -214,7 +217,7 @@ public class MultiLevelPriorityQueueTest {
 
   private MultiLevelPriorityQueue createQueue() {
     PinotConfiguration conf = new PinotConfiguration();
-    return createQueue(conf, new UnboundedResourceManager(conf));
+    return createQueue(conf, new UnboundedResourceManager(conf, new 
Tracing.DefaultThreadResourceUsageAccountant()));
   }
 
   private MultiLevelPriorityQueue createQueue(PinotConfiguration config, 
ResourceManager rm) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 0e7a4d67476..94d91dd80e6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -52,6 +52,7 @@ import 
org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.trace.Tracing;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -241,7 +242,7 @@ public class PrioritySchedulerTest {
     }
 
     public static TestPriorityScheduler create(PinotConfiguration config) {
-      ResourceManager rm = new PolicyBasedResourceManager(config);
+      ResourceManager rm = new PolicyBasedResourceManager(config, new 
Tracing.DefaultThreadResourceUsageAccountant());
       QueryExecutor qe = new TestQueryExecutor();
       _groupFactory = new TestSchedulerGroupFactory();
       MultiLevelPriorityQueue queue =
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
index 9c4c59ea1ef..b28bf4ac8ec 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java
@@ -27,7 +27,9 @@ import 
org.apache.pinot.core.query.scheduler.fcfs.BoundedFCFSScheduler;
 import org.apache.pinot.core.query.scheduler.fcfs.FCFSQueryScheduler;
 import 
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
 import 
org.apache.pinot.core.query.scheduler.tokenbucket.TokenPriorityScheduler;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -43,25 +45,27 @@ public class QuerySchedulerFactoryTest {
     LongAccumulator latestQueryTime = mock(LongAccumulator.class);
 
     PinotConfiguration config = new PinotConfiguration();
+    ThreadResourceUsageAccountant accountant = new 
Tracing.DefaultThreadResourceUsageAccountant();
     config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, 
QuerySchedulerFactory.FCFS_ALGORITHM);
-    QueryScheduler queryScheduler = QuerySchedulerFactory.create(config, 
queryExecutor, serverMetrics, latestQueryTime);
+    QueryScheduler queryScheduler =
+        QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, 
latestQueryTime, accountant);
     assertTrue(queryScheduler instanceof FCFSQueryScheduler);
 
     config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, 
QuerySchedulerFactory.TOKEN_BUCKET_ALGORITHM);
-    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime, accountant);
     assertTrue(queryScheduler instanceof TokenPriorityScheduler);
 
     config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, 
QuerySchedulerFactory.BOUNDED_FCFS_ALGORITHM);
-    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime, accountant);
     assertTrue(queryScheduler instanceof BoundedFCFSScheduler);
 
     config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY,
         QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM);
-    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime, accountant);
     assertTrue(queryScheduler instanceof BinaryWorkloadScheduler);
 
     config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, 
TestQueryScheduler.class.getName());
-    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime);
+    queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, 
serverMetrics, latestQueryTime, accountant);
     assertTrue(queryScheduler instanceof TestQueryScheduler);
   }
 
@@ -69,7 +73,9 @@ public class QuerySchedulerFactoryTest {
 
     public TestQueryScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics serverMetrics,
         LongAccumulator latestQueryTime) {
-      super(config, queryExecutor, new UnboundedResourceManager(config), 
serverMetrics, latestQueryTime);
+      super(config, queryExecutor,
+          new UnboundedResourceManager(config, new 
Tracing.DefaultThreadResourceUsageAccountant()), serverMetrics,
+          latestQueryTime);
     }
 
     @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
index 6e6b01cf431..23f7b9f7b1f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -48,7 +49,7 @@ public class ResourceManagerTest {
 
   private ResourceManager getResourceManager(int runners, int workers, final 
int softLimit, final int hardLimit) {
 
-    return new ResourceManager(getConfig(runners, workers)) {
+    return new ResourceManager(getConfig(runners, workers), new 
Tracing.DefaultThreadResourceUsageAccountant()) {
 
       @Override
       public QueryExecutorService getExecutorService(ServerQueryRequest query, 
SchedulerGroupAccountant accountant) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
index 7031a8ea227..0baeec19e52 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/UnboundedResourceManagerTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -35,7 +36,8 @@ public class UnboundedResourceManagerTest {
 
   @Test
   public void testDefault() {
-    UnboundedResourceManager rm = new UnboundedResourceManager(new 
PinotConfiguration());
+    UnboundedResourceManager rm =
+        new UnboundedResourceManager(new PinotConfiguration(), new 
Tracing.DefaultThreadResourceUsageAccountant());
     assertTrue(rm.getNumQueryRunnerThreads() > 1);
     assertTrue(rm.getNumQueryWorkerThreads() >= 1);
     assertEquals(rm.getTableThreadsHardLimit(), rm.getNumQueryRunnerThreads() 
+ rm.getNumQueryWorkerThreads());
@@ -51,7 +53,8 @@ public class UnboundedResourceManagerTest {
     properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, runners);
     properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, workers);
 
-    UnboundedResourceManager rm = new UnboundedResourceManager(new 
PinotConfiguration(properties));
+    UnboundedResourceManager rm = new UnboundedResourceManager(new 
PinotConfiguration(properties),
+        new Tracing.DefaultThreadResourceUsageAccountant());
     assertEquals(rm.getNumQueryWorkerThreads(), workers);
     assertEquals(rm.getNumQueryRunnerThreads(), runners);
     assertEquals(rm.getTableThreadsHardLimit(), runners + workers);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d98f919f29e..054a80ef14d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -75,10 +75,12 @@ import 
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesServerPlanVis
 import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
 import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.executor.ExecutorServiceUtils;
 import org.apache.pinot.spi.executor.HardLimitExecutor;
 import org.apache.pinot.spi.executor.MetricsExecutor;
+import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -144,7 +146,7 @@ public class QueryRunner {
    * <p>Should be called only once and before calling any other method.
    */
   public void init(PinotConfiguration serverConf, InstanceDataManager 
instanceDataManager,
-      @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats) {
+      @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
     String hostname = 
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
       hostname = 
hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
@@ -208,6 +210,12 @@ public class QueryRunner {
       _executorService = new HardLimitExecutor(hardLimit, _executorService);
     }
 
+    if 
(serverConf.getProperty(Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
+        Server.DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE)) {
+      LOGGER.info("Enable OOM Throttling on critical heap usage for 
multi-stage executor");
+      _executorService = new 
ThrottleOnCriticalHeapUsageExecutor(_executorService, resourceUsageAccountant);
+    }
+
     _opChainScheduler = new OpChainSchedulerService(_executorService, 
serverConf);
     _mailboxService = new MailboxService(hostname, port, serverConf, 
tlsConfig);
     try {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 43e99f1c0cb..5df708551be 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -30,6 +30,7 @@ import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -61,7 +62,8 @@ public class QueryServerEnclosure {
     
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 _queryRunnerPort);
     InstanceDataManager instanceDataManager = 
factory.buildInstanceDataManager();
     _queryRunner = new QueryRunner();
-    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, null, () -> true);
+    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, null, () -> true,
+        new Tracing.DefaultThreadResourceUsageAccountant());
   }
 
   public int getPort() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 2ba34b61bdc..516fb720fe1 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -51,6 +51,7 @@ import org.apache.pinot.server.access.AllowAllAccessFactory;
 import org.apache.pinot.server.conf.ServerConf;
 import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.server.worker.WorkerQueryServer;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
@@ -85,7 +86,8 @@ public class ServerInstance {
   private boolean _queryServerStarted = false;
 
   public ServerInstance(ServerConf serverConf, HelixManager helixManager, 
AccessControlFactory accessControlFactory,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
SendStatsPredicate sendStatsPredicate)
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
SendStatsPredicate sendStatsPredicate,
+      ThreadResourceUsageAccountant resourceUsageAccountant)
       throws Exception {
     LOGGER.info("Initializing server instance");
     _helixManager = helixManager;
@@ -122,7 +124,8 @@ public class ServerInstance {
     LOGGER.info("Initializing query scheduler");
     _latestQueryTime = new LongAccumulator(Long::max, 0);
     _queryScheduler =
-        QuerySchedulerFactory.create(serverConf.getSchedulerConfig(), 
_queryExecutor, _serverMetrics, _latestQueryTime);
+        QuerySchedulerFactory.create(serverConf.getSchedulerConfig(), 
_queryExecutor, _serverMetrics, _latestQueryTime,
+            resourceUsageAccountant);
 
     TlsConfig tlsConfig =
         TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), 
CommonConstants.Server.SERVER_TLS_PREFIX);
@@ -135,7 +138,7 @@ public class ServerInstance {
     if (serverConf.isMultiStageServerEnabled()) {
       LOGGER.info("Initializing Multi-stage query engine");
       _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager,
-          serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null, 
sendStatsPredicate);
+          serverConf.isMultiStageEngineTlsEnabled() ? tlsConfig : null, 
sendStatsPredicate, resourceUsageAccountant);
     } else {
       _workerQueryServer = null;
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index fa8b8fa6bc9..044d33bf7cb 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -663,14 +663,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
               segmentDownloadThrottler, 
segmentMultiColTextIndexPreprocessThrottler);
     }
 
-    SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
-    ServerConf serverConf = new ServerConf(_serverConf);
-    _serverInstance = new ServerInstance(serverConf, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler,
-        sendStatsPredicate);
-    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
-
-    InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
-    instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
     // initialize the thread accountant for query killing
     Tracing.ThreadAccountantOps.initializeThreadAccountant(
         _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId,
@@ -680,6 +672,15 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
           Tracing.getThreadAccountant().getClusterConfigChangeListener());
     }
 
+    SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
+    ServerConf serverConf = new ServerConf(_serverConf);
+    _serverInstance = new ServerInstance(serverConf, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler,
+        sendStatsPredicate, Tracing.getThreadAccountant());
+    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+
+    InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
+    instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
+
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index f8372d9960c..9528ec79af0 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -24,6 +24,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.service.server.QueryServer;
 import org.apache.pinot.server.starter.helix.SendStatsPredicate;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -34,13 +35,14 @@ public class WorkerQueryServer {
   private final QueryServer _queryWorkerService;
 
   public WorkerQueryServer(PinotConfiguration serverConf, InstanceDataManager 
instanceDataManager,
-      @Nullable TlsConfig tlsConfig, SendStatsPredicate sendStats) {
+      @Nullable TlsConfig tlsConfig, SendStatsPredicate sendStats,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
     serverConf = toWorkerQueryConfig(serverConf);
     String instanceId = 
serverConf.getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
     _queryServicePort = 
serverConf.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     QueryRunner queryRunner = new QueryRunner();
-    queryRunner.init(serverConf, instanceDataManager, tlsConfig, 
sendStats::isSendStats);
+    queryRunner.init(serverConf, instanceDataManager, tlsConfig, 
sendStats::isSendStats, resourceUsageAccountant);
     _queryWorkerService = new QueryServer(instanceId, _queryServicePort, 
queryRunner, tlsConfig, serverConf);
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index b3a270aea1e..1d6d590e0fa 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -84,6 +84,10 @@ public interface ThreadResourceUsageAccountant {
    */
   void sampleUsageMSE();
 
+  default boolean throttleQuerySubmission() {
+    return false;
+  }
+
   /**
    * special interface to aggregate usage to the stats store only once, it is 
used for response
    * ser/de threads where the thread execution context cannot be setup before 
hands as
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
new file mode 100644
index 00000000000..091fb5e3fe0
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+
+
+/**
+ * An Executor that throttles task submission when the heap usage is critical.
+ * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
+  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    super(executorService);
+    _threadResourceUsageAccountant = threadResourceUsageAccountant;
+  }
+
+  protected void checkTaskAllowed() {
+    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
+      throw new IllegalStateException("Tasks throttled due to high heap 
usage.");
+    }
+  }
+
+  @Override
+  protected <T> Callable<T> decorate(Callable<T> task) {
+    checkTaskAllowed();
+    return () -> {
+      checkTaskAllowed();
+      return task.call();
+    };
+  }
+
+  @Override
+  protected Runnable decorate(Runnable task) {
+    checkTaskAllowed();
+    return () -> {
+      checkTaskAllowed();
+      task.run();
+    };
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3a2b703edc1..b8a4d16152e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1099,6 +1099,11 @@ public class CommonConstants {
     @Deprecated
     public static final String DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR = 
DEFAULT_MULTISTAGE_EXECUTOR_TYPE;
 
+    // Enable SSE & MSE task throttling on critical heap usage.
+    public static final String 
CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE =
+        QUERY_EXECUTOR_CONFIG_PREFIX + ".enableThrottlingOnHeapUsage";
+    public static final boolean 
DEFAULT_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE = false;
+
     /**
      * The ExecutorServiceProvider to be used for timeseries threads.
      *
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
new file mode 100644
index 00000000000..82bc2e808df
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.executor;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+
+public class ThrottleOnCriticalHeapUsageExecutorTest {
+  @Test
+  void testThrottle() throws Exception {
+    ThreadResourceUsageAccountant accountant = new 
ThreadResourceUsageAccountant() {
+      final AtomicLong _numCalls = new AtomicLong(0);
+      @Override
+      public void clear() {
+      }
+
+      @Override
+      public boolean isAnchorThreadInterrupted() {
+        return false;
+      }
+
+      @Override
+      public void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+          @Nullable ThreadExecutionContext parentContext) {
+      }
+
+      @Override
+      public void setupRunner(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType) {
+      }
+
+      @Override
+      public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+          @Nullable ThreadExecutionContext parentContext) {
+      }
+
+      @Nullable
+      @Override
+      public ThreadExecutionContext getThreadExecutionContext() {
+        return null;
+      }
+
+      @Override
+      public void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider) {
+      }
+
+      @Override
+      public void sampleUsage() {
+      }
+
+      @Override
+      public void sampleUsageMSE() {
+      }
+
+      @Override
+      public boolean throttleQuerySubmission() {
+        return _numCalls.getAndIncrement() > 1;
+      }
+
+      @Override
+      public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, 
long allocatedBytes) {
+      }
+
+      @Override
+      public void updateQueryUsageConcurrently(String queryId) {
+      }
+
+      @Override
+      public void startWatcherTask() {
+      }
+
+      @Override
+      public Exception getErrorStatus() {
+        return null;
+      }
+
+      @Override
+      public Collection<? extends ThreadResourceTracker> getThreadResources() {
+        return List.of();
+      }
+
+      @Override
+      public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+        return Map.of();
+      }
+    };
+
+    ThrottleOnCriticalHeapUsageExecutor executor = new 
ThrottleOnCriticalHeapUsageExecutor(
+        Executors.newCachedThreadPool(), accountant);
+
+    CyclicBarrier barrier = new CyclicBarrier(2);
+
+    try {
+      executor.execute(() -> {
+        try {
+          barrier.await();
+          Thread.sleep(Long.MAX_VALUE);
+        } catch (InterruptedException | BrokenBarrierException e) {
+          // do nothing
+        }
+      });
+
+      barrier.await();
+      try {
+        executor.execute(() -> {
+          // do nothing
+        });
+        fail("Should not allow more than 1 task");
+      } catch (Exception e) {
+        // as expected
+        assertEquals(e.getMessage(), "Tasks throttled due to high heap 
usage.");
+      }
+    } catch (BrokenBarrierException | InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to