Jackie-Jiang commented on code in PR #16080:
URL: https://github.com/apache/pinot/pull/16080#discussion_r2141347061
##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java:
##########
@@ -46,21 +46,28 @@ public HardLimitExecutor(int max, ExecutorService
executorService) {
*/
public static int getMultiStageExecutorHardLimit(PinotConfiguration config) {
try {
- int maxThreads = Integer.parseInt(config.getProperty(
+ int serverConfigLimit =
config.getProperty(CommonConstants.Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS,
+ CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS);
+ if (serverConfigLimit > 0) {
+ return serverConfigLimit;
+ }
+ int maxThreadsFromClusterConfig = Integer.parseInt(config.getProperty(
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
));
int hardLimitFactor = Integer.parseInt(config.getProperty(
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR
));
- if (maxThreads <= 0 || hardLimitFactor <= 0) {
+ if (maxThreadsFromClusterConfig <= 0 || hardLimitFactor <= 0) {
return 0;
}
- return maxThreads * hardLimitFactor;
+ return maxThreadsFromClusterConfig * hardLimitFactor;
} catch (NumberFormatException e) {
- return
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)
- *
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR);
+ int defaultLimitFromClusterConfig =
+
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)
+ *
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR);
+ return
Math.min(CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS,
defaultLimitFromClusterConfig);
Review Comment:
Not introduced in this PR, but it will always return negative value.
Consider log a warning and return `-1` which disables the throttling
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java:
##########
@@ -56,26 +57,28 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
private HelixConfigScope _helixConfigScope;
private int _numBrokers;
private int _numServers;
+ private AdjustableSemaphore _semaphore;
/**
* If _maxServerQueryThreads is <= 0, it means that the cluster is not
configured to limit the number of multi-stage
* queries that can be executed concurrently. In this case, we should not
block the query.
*/
private int _maxServerQueryThreads;
- private AdjustableSemaphore _semaphore;
+ private final int _maxServerQueryThreadsFromServerConfig;
private final AtomicInteger _currentQueryServerThreads = new AtomicInteger();
+ public MultiStageQueryThrottler(PinotConfiguration brokerConf) {
+ _maxServerQueryThreadsFromServerConfig = brokerConf.getProperty(
+ CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS,
Review Comment:
(Code Style) Is the indentation correct?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java:
##########
@@ -46,21 +50,31 @@ public HardLimitExecutor(int max, ExecutorService
executorService) {
*/
public static int getMultiStageExecutorHardLimit(PinotConfiguration config) {
try {
- int maxThreads = Integer.parseInt(config.getProperty(
+ int maxThreadsFromClusterConfig = Integer.parseInt(config.getProperty(
Review Comment:
Consider renaming the variable given this is instance level config
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -195,6 +195,7 @@ public void init(PinotConfiguration config,
InstanceDataManager instanceDataMana
int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config);
if (hardLimit > 0) {
+ LOGGER.info("Setting hard limit for multi-stage executor to:
hardLimit={}", hardLimit);
Review Comment:
We don't usually use `k=v` in the log. Most common way is `key: {}`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java:
##########
@@ -86,7 +89,10 @@ public void init(HelixManager helixManager) {
.count());
if (_maxServerQueryThreads > 0) {
- _semaphore = new AdjustableSemaphore(Math.max(1, _maxServerQueryThreads
* _numServers / _numBrokers), true);
+ int semaphoreLimit = Math.max(1, _maxServerQueryThreads * _numServers /
_numBrokers);
+ LOGGER.info("Setting max server query threads to {} for
_maxServerQueryThreads={}, {} brokers, and {} servers",
Review Comment:
(minor) We usually put colon before variables. Same for other places
```suggestion
LOGGER.info("Setting max server query threads to: {} with
maxServerQueryThreads: {}, numBrokers: {}, numServers: {}",
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java:
##########
@@ -56,26 +57,28 @@ public class MultiStageQueryThrottler implements
ClusterChangeHandler {
private HelixConfigScope _helixConfigScope;
private int _numBrokers;
private int _numServers;
+ private AdjustableSemaphore _semaphore;
/**
* If _maxServerQueryThreads is <= 0, it means that the cluster is not
configured to limit the number of multi-stage
* queries that can be executed concurrently. In this case, we should not
block the query.
*/
private int _maxServerQueryThreads;
- private AdjustableSemaphore _semaphore;
+ private final int _maxServerQueryThreadsFromServerConfig;
Review Comment:
```suggestion
private final int _maxServerQueryThreadsFromBrokerConfig;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]