This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c65891c247f Log only mode for multistage query thread limiting (#16241)
c65891c247f is described below

commit c65891c247faa2f50cc174c7c8e51c807904488c
Author: Satwik Pachigolla <[email protected]>
AuthorDate: Thu Jul 3 23:37:59 2025 -0700

    Log only mode for multistage query thread limiting (#16241)
---
 .../requesthandler/MultiStageQueryThrottler.java   | 56 +++++++++++++++++-----
 .../MultiStageQueryThrottlerTest.java              | 25 ++++++++++
 .../apache/pinot/query/runtime/QueryRunner.java    | 18 ++++++-
 .../pinot/spi/executor/HardLimitExecutor.java      | 19 +++++++-
 .../pinot/spi/query/QueryThreadExceedStrategy.java | 42 ++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  6 +++
 .../pinot/spi/executor/HardLimitExecutorTest.java  | 27 +++++++++++
 7 files changed, 176 insertions(+), 17 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
index e2318505008..1381e1a465c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
@@ -32,6 +32,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
 import org.apache.pinot.common.concurrency.AdjustableSemaphore;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiStageQueryThrottler.class);
 
   private final int _maxServerQueryThreadsFromBrokerConfig;
+  private final QueryThreadExceedStrategy _exceedStrategy;
   private final AtomicInteger _currentQueryServerThreads = new AtomicInteger();
   /**
    * If _maxServerQueryThreads is <= 0, it means that the cluster is not 
configured to limit the number of multi-stage
@@ -71,6 +73,20 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
     _maxServerQueryThreadsFromBrokerConfig = brokerConf.getProperty(
         CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS,
         CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS);
+
+    String strategyStr = brokerConf.getProperty(
+        
CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY,
+        
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY);
+    QueryThreadExceedStrategy strategy;
+    try {
+      strategy = QueryThreadExceedStrategy.valueOf(strategyStr.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      LOGGER.error("Invalid exceed strategy: {}, using default: {}", 
strategyStr,
+          
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY);
+      strategy = QueryThreadExceedStrategy.valueOf(
+          
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY);
+    }
+    _exceedStrategy = strategy;
   }
 
   @Override
@@ -117,7 +133,7 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
       return true;
     }
 
-    if (numQueryThreads > _semaphore.getTotalPermits()) {
+    if (numQueryThreads > _semaphore.getTotalPermits() && _exceedStrategy != 
QueryThreadExceedStrategy.LOG) {
       throw new RuntimeException(
           String.format("Can't dispatch query because the estimated number of 
server threads for this query is too "
               + "large for the configured value of '%s' or '%s'. 
estimatedThreads=%d configuredLimit=%d",
@@ -126,11 +142,25 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
                   numQueryThreads, _semaphore.getTotalPermits()));
     }
 
-    boolean result = _semaphore.tryAcquire(numQueryThreads, timeout, unit);
-    if (result) {
+    if (_exceedStrategy == QueryThreadExceedStrategy.LOG) {
+      boolean wouldThrottle = _currentQueryServerThreads.get() + 
numQueryThreads > _maxServerQueryThreads;
+      if (wouldThrottle) {
+        LOGGER.warn(
+            "Exceed strategy LOG: Query would have been throttled. 
estimatedThreads: {} availableThreads: {}",
+            numQueryThreads, _maxServerQueryThreads - 
_currentQueryServerThreads.get());
+      }
       _currentQueryServerThreads.addAndGet(numQueryThreads);
+      return true;
+    } else if (_exceedStrategy == QueryThreadExceedStrategy.WAIT) {
+      boolean result = _semaphore.tryAcquire(numQueryThreads, timeout, unit);
+      if (result) {
+        _currentQueryServerThreads.addAndGet(numQueryThreads);
+      }
+      return result;
+    } else {
+      throw new IllegalStateException(String.format(
+          "%s is configured to an unsupported strategy.", 
this.getClass().getName()));
     }
-    return result;
   }
 
   /**
@@ -139,7 +169,7 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
    */
   public void release(int numQueryThreads) {
     _currentQueryServerThreads.addAndGet(-1 * numQueryThreads);
-    if (_maxServerQueryThreads > 0) {
+    if (_maxServerQueryThreads > 0 && _exceedStrategy == 
QueryThreadExceedStrategy.WAIT) {
       _semaphore.release(numQueryThreads);
     }
   }
@@ -168,14 +198,14 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
         }
       }
     } else {
-      int maxServerQueryThreads = calculateMaxServerQueryThreads();
+      int newMaxServerQueryThreads = calculateMaxServerQueryThreads();
 
-      if (_maxServerQueryThreads == maxServerQueryThreads) {
+      if (_maxServerQueryThreads == newMaxServerQueryThreads) {
         return;
       }
 
-      if (_maxServerQueryThreads <= 0 && maxServerQueryThreads > 0
-          || _maxServerQueryThreads > 0 && maxServerQueryThreads <= 0) {
+      if (_maxServerQueryThreads <= 0 && newMaxServerQueryThreads > 0
+          || _maxServerQueryThreads > 0 && newMaxServerQueryThreads <= 0) {
         // This operation isn't safe to do while queries are running so we 
require a restart of the broker for this
         // change to take effect.
         LOGGER.warn("Enabling or disabling limitation of the maximum number of 
multi-stage queries running "
@@ -183,8 +213,8 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
         return;
       }
 
-      if (maxServerQueryThreads > 0) {
-        _maxServerQueryThreads = maxServerQueryThreads;
+      if (newMaxServerQueryThreads > 0) {
+        _maxServerQueryThreads = newMaxServerQueryThreads;
         int semaphoreLimit = calculateSemaphoreLimit();
         _semaphore.setPermits(semaphoreLimit);
       }
@@ -214,8 +244,8 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
   private int calculateSemaphoreLimit() {
     int semaphoreLimit = Math.max(1, _maxServerQueryThreads * _numServers / 
_numBrokers);
     LOGGER.info("Calculating estimated server query threads limit: {} for 
maxServerQueryThreads: {}, "
-            + "numBrokers: {}, and numServers: {}",
-        semaphoreLimit, _maxServerQueryThreads, _numBrokers, _numServers);
+            + "numBrokers: {}, numServers: {}, exceedStrategy: {}",
+        semaphoreLimit, _maxServerQueryThreads, _numBrokers, _numServers, 
_exceedStrategy);
     return semaphoreLimit;
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
index 71cd1e67082..d0c1535f8c9 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
@@ -95,6 +95,31 @@ public class MultiStageQueryThrottlerTest {
     Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100, 
TimeUnit.MILLISECONDS));
   }
 
+  @Test
+  public void testAcquireReleaseLogExceedStrategy()
+      throws Exception {
+    when(_helixAdmin.getConfig(any(),
+        
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+    
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
 "2"));
+    Map<String, Object> configMap = new HashMap<>();
+    
configMap.put(CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY,
 "LOG");
+    PinotConfiguration config = new PinotConfiguration(configMap);
+    _multiStageQueryThrottler = new MultiStageQueryThrottler(config);
+
+    Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, 
TimeUnit.MILLISECONDS));
+    Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, 
TimeUnit.MILLISECONDS));
+    // over limit but should acquire since log-only is enabled
+    Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, 
TimeUnit.MILLISECONDS));
+    Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, 
TimeUnit.MILLISECONDS));
+
+    Assert.assertEquals(_multiStageQueryThrottler.currentQueryServerThreads(), 
4);
+
+    _multiStageQueryThrottler.release(2);
+    Assert.assertEquals(_multiStageQueryThrottler.currentQueryServerThreads(), 
2);
+    _multiStageQueryThrottler.release(2);
+    Assert.assertEquals(_multiStageQueryThrottler.currentQueryServerThreads(), 
0);
+  }
+
   @Test
   public void testDisabledThrottling()
       throws Exception {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 054a80ef14d..4b2fcc3a557 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -82,6 +82,7 @@ import org.apache.pinot.spi.executor.HardLimitExecutor;
 import org.apache.pinot.spi.executor.MetricsExecutor;
 import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
 import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -206,8 +207,21 @@ public class QueryRunner {
 
     int hardLimit = 
HardLimitExecutor.getMultiStageExecutorHardLimit(serverConf);
     if (hardLimit > 0) {
-      LOGGER.info("Setting multi-stage executor hardLimit: {}", hardLimit);
-      _executorService = new HardLimitExecutor(hardLimit, _executorService);
+      String strategyStr = serverConf.getProperty(
+          
CommonConstants.Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY,
+          
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+      QueryThreadExceedStrategy exceedStrategy;
+      try {
+        exceedStrategy = 
QueryThreadExceedStrategy.valueOf(strategyStr.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        LOGGER.error("Invalid exceed strategy: {}, using default: {}", 
strategyStr,
+            
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+        exceedStrategy = QueryThreadExceedStrategy.valueOf(
+            
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+      }
+
+      LOGGER.info("Setting multi-stage executor hardLimit: {} exceedStrategy: 
{}", hardLimit, exceedStrategy);
+      _executorService = new HardLimitExecutor(hardLimit, _executorService, 
exceedStrategy);
     }
 
     if 
(serverConf.getProperty(Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
index dedf2a0f492..8f66b4c3a84 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,11 +37,17 @@ public class HardLimitExecutor extends 
DecoratorExecutorService {
 
   private final AtomicInteger _running;
   private final int _max;
+  private final QueryThreadExceedStrategy _exceedStrategy;
 
-  public HardLimitExecutor(int max, ExecutorService executorService) {
+  public HardLimitExecutor(int max, ExecutorService executorService, 
QueryThreadExceedStrategy exceedStrategy) {
     super(executorService);
     _running = new AtomicInteger(0);
     _max = max;
+    _exceedStrategy = exceedStrategy;
+  }
+
+  public HardLimitExecutor(int max, ExecutorService executorService) {
+    this(max, executorService, QueryThreadExceedStrategy.ERROR);
   }
 
   /**
@@ -73,7 +80,15 @@ public class HardLimitExecutor extends 
DecoratorExecutorService {
 
   protected void checkTaskAllowed() {
     if (_running.get() >= _max) {
-      throw new IllegalStateException("Tasks limit exceeded.");
+      if (_exceedStrategy == QueryThreadExceedStrategy.LOG) {
+        LOGGER.warn("Exceed strategy LOG: Tasks limit max: {} exceeded with 
running: {} tasks.",
+            _max, _running.get());
+      } else if (_exceedStrategy == QueryThreadExceedStrategy.ERROR) {
+        throw new IllegalStateException("Tasks limit exceeded.");
+      } else {
+        throw new IllegalStateException(String.format(
+            "%s is configured to an unsupported strategy.", 
this.getClass().getName()));
+      }
     }
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadExceedStrategy.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadExceedStrategy.java
new file mode 100644
index 00000000000..1d770d4a09d
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadExceedStrategy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+/**
+ * Defines the behavior when query thread limits are exceeded in multistage 
engine
+ */
+public enum QueryThreadExceedStrategy {
+  /**
+   * Wait for resources to become available
+   * @implNote Not supported by server
+   */
+  WAIT,
+
+  /**
+   * Throw an error immediately
+   * @implNote Not supported by broker
+   */
+  ERROR,
+
+  /**
+   * Logs warning when limits exceeded but allows operations to proceed
+   *
+   */
+  LOG
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b8a4d16152e..290cf20c231 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -405,6 +405,9 @@ public class CommonConstants {
 
     public static final String CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS = 
"pinot.broker.mse.max.server.query.threads";
     public static final int DEFAULT_MSE_MAX_SERVER_QUERY_THREADS = -1;
+    public static final String 
CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY =
+        "pinot.broker.mse.max.server.query.threads.exceed.strategy";
+    public static final String 
DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY = "WAIT";
 
     // Configure the request handler type used by broker to handler inbound 
query request.
     // NOTE: the request handler type refers to the communication between 
Broker and Server.
@@ -1036,6 +1039,9 @@ public class CommonConstants {
     public static final String CONFIG_OF_MSE_MAX_EXECUTION_THREADS =
         MSE_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS;
     public static final int DEFAULT_MSE_MAX_EXECUTION_THREADS = -1;
+    public static final String 
CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY =
+        MSE_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS + ".exceed.strategy";
+    public static final String 
DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY = "ERROR";
 
     // For group-by queries with order-by clause, the tail groups are trimmed 
off to reduce the memory footprint. To
     // ensure the accuracy of the result, {@code max(limit * 5, minTrimSize)} 
groups are retained. When
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
index 8eb74466b43..35834783d25 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Executors;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.annotations.Test;
 
@@ -64,6 +65,32 @@ public class HardLimitExecutorTest {
     }
   }
 
+  @Test
+  public void testHardLimitLogExceedStrategy()
+      throws Exception {
+    HardLimitExecutor ex = new HardLimitExecutor(1, 
Executors.newCachedThreadPool(), QueryThreadExceedStrategy.LOG);
+    CyclicBarrier barrier = new CyclicBarrier(2);
+
+    try {
+      ex.execute(() -> {
+        try {
+          barrier.await();
+          Thread.sleep(Long.MAX_VALUE);
+        } catch (InterruptedException | BrokenBarrierException e) {
+          // do nothing
+        }
+      });
+
+      barrier.await();
+
+      ex.execute(() -> {
+        // do nothing, we just don't want it to throw an exception
+      });
+    } finally {
+      ex.shutdownNow();
+    }
+  }
+
   @Test
   public void testGetMultiStageExecutorHardLimit() {
     // Only cluster config is set


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to