This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c65891c247f Log only mode for multistage query thread limiting (#16241)
c65891c247f is described below
commit c65891c247faa2f50cc174c7c8e51c807904488c
Author: Satwik Pachigolla <[email protected]>
AuthorDate: Thu Jul 3 23:37:59 2025 -0700
Log only mode for multistage query thread limiting (#16241)
---
.../requesthandler/MultiStageQueryThrottler.java | 56 +++++++++++++++++-----
.../MultiStageQueryThrottlerTest.java | 25 ++++++++++
.../apache/pinot/query/runtime/QueryRunner.java | 18 ++++++-
.../pinot/spi/executor/HardLimitExecutor.java | 19 +++++++-
.../pinot/spi/query/QueryThreadExceedStrategy.java | 42 ++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 6 +++
.../pinot/spi/executor/HardLimitExecutorTest.java | 27 +++++++++++
7 files changed, 176 insertions(+), 17 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
index e2318505008..1381e1a465c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java
@@ -32,6 +32,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
import org.apache.pinot.common.concurrency.AdjustableSemaphore;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(MultiStageQueryThrottler.class);
private final int _maxServerQueryThreadsFromBrokerConfig;
+ private final QueryThreadExceedStrategy _exceedStrategy;
private final AtomicInteger _currentQueryServerThreads = new AtomicInteger();
/**
* If _maxServerQueryThreads is <= 0, it means that the cluster is not
configured to limit the number of multi-stage
@@ -71,6 +73,20 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
_maxServerQueryThreadsFromBrokerConfig = brokerConf.getProperty(
CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS);
+
+ String strategyStr = brokerConf.getProperty(
+
CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY,
+
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY);
+ QueryThreadExceedStrategy strategy;
+ try {
+ strategy = QueryThreadExceedStrategy.valueOf(strategyStr.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOGGER.error("Invalid exceed strategy: {}, using default: {}",
strategyStr,
+
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY);
+ strategy = QueryThreadExceedStrategy.valueOf(
+
CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY);
+ }
+ _exceedStrategy = strategy;
}
@Override
@@ -117,7 +133,7 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
return true;
}
- if (numQueryThreads > _semaphore.getTotalPermits()) {
+ if (numQueryThreads > _semaphore.getTotalPermits() && _exceedStrategy !=
QueryThreadExceedStrategy.LOG) {
throw new RuntimeException(
String.format("Can't dispatch query because the estimated number of
server threads for this query is too "
+ "large for the configured value of '%s' or '%s'.
estimatedThreads=%d configuredLimit=%d",
@@ -126,11 +142,25 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
numQueryThreads, _semaphore.getTotalPermits()));
}
- boolean result = _semaphore.tryAcquire(numQueryThreads, timeout, unit);
- if (result) {
+ if (_exceedStrategy == QueryThreadExceedStrategy.LOG) {
+ boolean wouldThrottle = _currentQueryServerThreads.get() +
numQueryThreads > _maxServerQueryThreads;
+ if (wouldThrottle) {
+ LOGGER.warn(
+ "Exceed strategy LOG: Query would have been throttled.
estimatedThreads: {} availableThreads: {}",
+ numQueryThreads, _maxServerQueryThreads -
_currentQueryServerThreads.get());
+ }
_currentQueryServerThreads.addAndGet(numQueryThreads);
+ return true;
+ } else if (_exceedStrategy == QueryThreadExceedStrategy.WAIT) {
+ boolean result = _semaphore.tryAcquire(numQueryThreads, timeout, unit);
+ if (result) {
+ _currentQueryServerThreads.addAndGet(numQueryThreads);
+ }
+ return result;
+ } else {
+ throw new IllegalStateException(String.format(
+ "%s is configured to an unsupported strategy.",
this.getClass().getName()));
}
- return result;
}
/**
@@ -139,7 +169,7 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
*/
public void release(int numQueryThreads) {
_currentQueryServerThreads.addAndGet(-1 * numQueryThreads);
- if (_maxServerQueryThreads > 0) {
+ if (_maxServerQueryThreads > 0 && _exceedStrategy ==
QueryThreadExceedStrategy.WAIT) {
_semaphore.release(numQueryThreads);
}
}
@@ -168,14 +198,14 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
}
}
} else {
- int maxServerQueryThreads = calculateMaxServerQueryThreads();
+ int newMaxServerQueryThreads = calculateMaxServerQueryThreads();
- if (_maxServerQueryThreads == maxServerQueryThreads) {
+ if (_maxServerQueryThreads == newMaxServerQueryThreads) {
return;
}
- if (_maxServerQueryThreads <= 0 && maxServerQueryThreads > 0
- || _maxServerQueryThreads > 0 && maxServerQueryThreads <= 0) {
+ if (_maxServerQueryThreads <= 0 && newMaxServerQueryThreads > 0
+ || _maxServerQueryThreads > 0 && newMaxServerQueryThreads <= 0) {
// This operation isn't safe to do while queries are running so we
require a restart of the broker for this
// change to take effect.
LOGGER.warn("Enabling or disabling limitation of the maximum number of
multi-stage queries running "
@@ -183,8 +213,8 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
return;
}
- if (maxServerQueryThreads > 0) {
- _maxServerQueryThreads = maxServerQueryThreads;
+ if (newMaxServerQueryThreads > 0) {
+ _maxServerQueryThreads = newMaxServerQueryThreads;
int semaphoreLimit = calculateSemaphoreLimit();
_semaphore.setPermits(semaphoreLimit);
}
@@ -214,8 +244,8 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
private int calculateSemaphoreLimit() {
int semaphoreLimit = Math.max(1, _maxServerQueryThreads * _numServers /
_numBrokers);
LOGGER.info("Calculating estimated server query threads limit: {} for
maxServerQueryThreads: {}, "
- + "numBrokers: {}, and numServers: {}",
- semaphoreLimit, _maxServerQueryThreads, _numBrokers, _numServers);
+ + "numBrokers: {}, numServers: {}, exceedStrategy: {}",
+ semaphoreLimit, _maxServerQueryThreads, _numBrokers, _numServers,
_exceedStrategy);
return semaphoreLimit;
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
index 71cd1e67082..d0c1535f8c9 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
@@ -95,6 +95,31 @@ public class MultiStageQueryThrottlerTest {
Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100,
TimeUnit.MILLISECONDS));
}
+ @Test
+ public void testAcquireReleaseLogExceedStrategy()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+
eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+
).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
"2"));
+ Map<String, Object> configMap = new HashMap<>();
+
configMap.put(CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY,
"LOG");
+ PinotConfiguration config = new PinotConfiguration(configMap);
+ _multiStageQueryThrottler = new MultiStageQueryThrottler(config);
+
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100,
TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100,
TimeUnit.MILLISECONDS));
+ // over limit but should acquire since log-only is enabled
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100,
TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100,
TimeUnit.MILLISECONDS));
+
+ Assert.assertEquals(_multiStageQueryThrottler.currentQueryServerThreads(),
4);
+
+ _multiStageQueryThrottler.release(2);
+ Assert.assertEquals(_multiStageQueryThrottler.currentQueryServerThreads(),
2);
+ _multiStageQueryThrottler.release(2);
+ Assert.assertEquals(_multiStageQueryThrottler.currentQueryServerThreads(),
0);
+ }
+
@Test
public void testDisabledThrottling()
throws Exception {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 054a80ef14d..4b2fcc3a557 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -82,6 +82,7 @@ import org.apache.pinot.spi.executor.HardLimitExecutor;
import org.apache.pinot.spi.executor.MetricsExecutor;
import org.apache.pinot.spi.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -206,8 +207,21 @@ public class QueryRunner {
int hardLimit =
HardLimitExecutor.getMultiStageExecutorHardLimit(serverConf);
if (hardLimit > 0) {
- LOGGER.info("Setting multi-stage executor hardLimit: {}", hardLimit);
- _executorService = new HardLimitExecutor(hardLimit, _executorService);
+ String strategyStr = serverConf.getProperty(
+
CommonConstants.Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY,
+
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+ QueryThreadExceedStrategy exceedStrategy;
+ try {
+ exceedStrategy =
QueryThreadExceedStrategy.valueOf(strategyStr.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOGGER.error("Invalid exceed strategy: {}, using default: {}",
strategyStr,
+
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+ exceedStrategy = QueryThreadExceedStrategy.valueOf(
+
CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY);
+ }
+
+ LOGGER.info("Setting multi-stage executor hardLimit: {} exceedStrategy:
{}", hardLimit, exceedStrategy);
+ _executorService = new HardLimitExecutor(hardLimit, _executorService,
exceedStrategy);
}
if
(serverConf.getProperty(Server.CONFIG_OF_ENABLE_QUERY_SCHEDULER_THROTTLING_ON_HEAP_USAGE,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
index dedf2a0f492..8f66b4c3a84 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +37,17 @@ public class HardLimitExecutor extends
DecoratorExecutorService {
private final AtomicInteger _running;
private final int _max;
+ private final QueryThreadExceedStrategy _exceedStrategy;
- public HardLimitExecutor(int max, ExecutorService executorService) {
+ public HardLimitExecutor(int max, ExecutorService executorService,
QueryThreadExceedStrategy exceedStrategy) {
super(executorService);
_running = new AtomicInteger(0);
_max = max;
+ _exceedStrategy = exceedStrategy;
+ }
+
+ public HardLimitExecutor(int max, ExecutorService executorService) {
+ this(max, executorService, QueryThreadExceedStrategy.ERROR);
}
/**
@@ -73,7 +80,15 @@ public class HardLimitExecutor extends
DecoratorExecutorService {
protected void checkTaskAllowed() {
if (_running.get() >= _max) {
- throw new IllegalStateException("Tasks limit exceeded.");
+ if (_exceedStrategy == QueryThreadExceedStrategy.LOG) {
+ LOGGER.warn("Exceed strategy LOG: Tasks limit max: {} exceeded with
running: {} tasks.",
+ _max, _running.get());
+ } else if (_exceedStrategy == QueryThreadExceedStrategy.ERROR) {
+ throw new IllegalStateException("Tasks limit exceeded.");
+ } else {
+ throw new IllegalStateException(String.format(
+ "%s is configured to an unsupported strategy.",
this.getClass().getName()));
+ }
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadExceedStrategy.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadExceedStrategy.java
new file mode 100644
index 00000000000..1d770d4a09d
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadExceedStrategy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+/**
+ * Defines the behavior when query thread limits are exceeded in multistage
engine
+ */
+public enum QueryThreadExceedStrategy {
+ /**
+ * Wait for resources to become available
+ * @implNote Not supported by server
+ */
+ WAIT,
+
+ /**
+ * Throw an error immediately
+ * @implNote Not supported by broker
+ */
+ ERROR,
+
+ /**
+ * Logs warning when limits exceeded but allows operations to proceed
+ *
+ */
+ LOG
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b8a4d16152e..290cf20c231 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -405,6 +405,9 @@ public class CommonConstants {
public static final String CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS =
"pinot.broker.mse.max.server.query.threads";
public static final int DEFAULT_MSE_MAX_SERVER_QUERY_THREADS = -1;
+ public static final String
CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY =
+ "pinot.broker.mse.max.server.query.threads.exceed.strategy";
+ public static final String
DEFAULT_MSE_MAX_SERVER_QUERY_THREADS_EXCEED_STRATEGY = "WAIT";
// Configure the request handler type used by broker to handler inbound
query request.
// NOTE: the request handler type refers to the communication between
Broker and Server.
@@ -1036,6 +1039,9 @@ public class CommonConstants {
public static final String CONFIG_OF_MSE_MAX_EXECUTION_THREADS =
MSE_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS;
public static final int DEFAULT_MSE_MAX_EXECUTION_THREADS = -1;
+ public static final String
CONFIG_OF_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY =
+ MSE_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS + ".exceed.strategy";
+ public static final String
DEFAULT_MSE_MAX_EXECUTION_THREADS_EXCEED_STRATEGY = "ERROR";
// For group-by queries with order-by clause, the tail groups are trimmed
off to reduce the memory footprint. To
// ensure the accuracy of the result, {@code max(limit * 5, minTrimSize)}
groups are retained. When
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
index 8eb74466b43..35834783d25 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/HardLimitExecutorTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadExceedStrategy;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;
@@ -64,6 +65,32 @@ public class HardLimitExecutorTest {
}
}
+ @Test
+ public void testHardLimitLogExceedStrategy()
+ throws Exception {
+ HardLimitExecutor ex = new HardLimitExecutor(1,
Executors.newCachedThreadPool(), QueryThreadExceedStrategy.LOG);
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ try {
+ ex.execute(() -> {
+ try {
+ barrier.await();
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // do nothing
+ }
+ });
+
+ barrier.await();
+
+ ex.execute(() -> {
+ // do nothing, we just don't want it to throw an exception
+ });
+ } finally {
+ ex.shutdownNow();
+ }
+ }
+
@Test
public void testGetMultiStageExecutorHardLimit() {
// Only cluster config is set
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]