This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9a7e11682a Database query quota (#13544)
9a7e11682a is described below
commit 9a7e11682a56761684edb1aae4a7ec5b9984c6b8
Author: Shounak kulkarni <[email protected]>
AuthorDate: Mon Aug 5 08:42:26 2024 +0500
Database query quota (#13544)
---
.../broker/broker/helix/BaseBrokerStarter.java | 17 ++
...okerResourceOnlineOfflineStateModelFactory.java | 3 +
.../BrokerUserDefinedMessageHandlerFactory.java | 33 +++
.../broker/broker/helix/ClusterChangeMediator.java | 12 +-
.../HelixExternalViewBasedQueryQuotaManager.java | 219 ++++++++++++++++++--
.../pinot/broker/queryquota/QueryQuotaManager.java | 7 +
.../BaseSingleStageBrokerRequestHandler.java | 10 +-
.../MultiStageBrokerRequestHandler.java | 13 +-
...elixExternalViewBasedQueryQuotaManagerTest.java | 222 +++++++++++++++++----
.../BaseSingleStageBrokerRequestHandlerTest.java | 1 +
.../messages/DatabaseConfigRefreshMessage.java | 60 ++++++
.../pinot/common/metadata/ZKMetadataProvider.java | 85 ++++++++
.../apache/pinot/common/utils/DatabaseUtils.java | 14 ++
.../resources/PinotDatabaseRestletResource.java | 85 +++++++-
.../helix/core/PinotHelixResourceManager.java | 56 ++++++
.../java/org/apache/pinot/core/auth/Actions.java | 2 +
.../tests/QueryQuotaClusterIntegrationTest.java | 207 +++++++++++++++++++
.../apache/pinot/spi/config/DatabaseConfig.java | 56 ++++++
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
19 files changed, 1046 insertions(+), 57 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 04bf6ce921..553228d89c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -107,6 +107,8 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected String _instanceId;
private volatile boolean _isStarting = false;
private volatile boolean _isShuttingDown = false;
+
+ protected final List<ClusterChangeHandler> _clusterConfigChangeHandlers =
new ArrayList<>();
protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new
ArrayList<>();
protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new
ArrayList<>();
protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers =
new ArrayList<>();
@@ -214,6 +216,15 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
}
+ /**
+ * Adds a cluster config change handler to handle Helix cluster config
change callbacks.
+ * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
+ * handlers from running. For slow change handler, make it asynchronous.
+ */
+ public void addClusterConfigChangeHandler(ClusterChangeHandler
clusterConfigChangeHandler) {
+ _clusterConfigChangeHandlers.add(clusterConfigChangeHandler);
+ }
+
/**
* Adds a live instance change handler to handle Helix live instance change
callbacks.
* <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
@@ -350,6 +361,10 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_brokerAdminApplication.start(_listenerConfigs);
LOGGER.info("Initializing cluster change mediator");
+ for (ClusterChangeHandler clusterConfigChangeHandler :
_clusterConfigChangeHandlers) {
+ clusterConfigChangeHandler.init(_spectatorHelixManager);
+ }
+ _clusterConfigChangeHandlers.add(queryQuotaManager);
for (ClusterChangeHandler idealStateChangeHandler :
_idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
@@ -368,6 +383,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
liveInstanceChangeHandler.init(_spectatorHelixManager);
}
Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new
HashMap<>();
+ clusterChangeHandlersMap.put(ChangeType.CLUSTER_CONFIG,
_clusterConfigChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE,
_idealStateChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW,
_externalViewChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG,
_instanceConfigChangeHandlers);
@@ -379,6 +395,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
+ _spectatorHelixManager.addClusterfigChangeListener(_clusterChangeMediator);
if (!_liveInstanceChangeHandlers.isEmpty()) {
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 648a2a43ff..41f9e9ef87 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -30,6 +30,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,8 @@ public class BrokerResourceOnlineOfflineStateModelFactory
extends StateModelFact
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
+ _queryQuotaManager.createDatabaseRateLimiter(
+
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType));
} catch (Exception e) {
LOGGER.error("Caught exception while processing transition from
OFFLINE to ONLINE for table: {}",
tableNameWithType, e);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 4283b10cc9..2c2cc33532 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -25,9 +25,11 @@ import
org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
+import org.apache.pinot.common.utils.DatabaseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +63,8 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
return new RefreshTableConfigMessageHandler(new
TableConfigRefreshMessage(message), context);
case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE:
return new RebuildRoutingTableMessageHandler(new
RoutingTableRebuildMessage(message), context);
+ case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
+ return new RefreshDatabaseConfigMessageHandler(new
DatabaseConfigRefreshMessage(message), context);
default:
// NOTE: Log a warning and return no-op message handler for
unsupported message sub-types. This can happen when
// a new message sub-type is added, and the sender gets deployed
first while receiver is still running the
@@ -117,6 +121,9 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
// TODO: Fetch the table config here and pass it into the managers, or
consider merging these 2 managers
_routingManager.buildRouting(_tableNameWithType);
_queryQuotaManager.initOrUpdateTableQueryQuota(_tableNameWithType);
+ // only create the rate limiter if not present. This message has no
reason to update the database rate limiter
+ _queryQuotaManager.createDatabaseRateLimiter(
+
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(_tableNameWithType));
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
@@ -129,6 +136,32 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
}
}
+ private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
+ final String _databaseName;
+
+ RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage
databaseConfigRefreshMessage,
+ NotificationContext context) {
+ super(databaseConfigRefreshMessage, context);
+ _databaseName = databaseConfigRefreshMessage.getDatabaseName();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ // only update the existing rate limiter.
+ // Database rate limiter creation should only be done through table
based change triggers
+ _queryQuotaManager.updateDatabaseRateLimiter(_databaseName);
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ LOGGER.error("Got error while refreshing database config for database:
{} (error code: {}, error type: {})",
+ _databaseName, code, type, e);
+ }
+ }
+
private class RebuildRoutingTableMessageHandler extends MessageHandler {
final String _tableNameWithType;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 202f1a3f8e..f1e25804bf 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.BatchMode;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -55,7 +57,7 @@ import org.slf4j.LoggerFactory;
@PreFetch(enabled = false)
public class ClusterChangeMediator
implements IdealStateChangeListener, ExternalViewChangeListener,
InstanceConfigChangeListener,
- LiveInstanceChangeListener {
+ ClusterConfigChangeListener, LiveInstanceChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterChangeMediator.class);
// If no change got for 1 hour, proactively check changes
@@ -192,6 +194,14 @@ public class ClusterChangeMediator
enqueueChange(ChangeType.INSTANCE_CONFIG);
}
+ @Override
+ public void onClusterConfigChange(ClusterConfig clusterConfig,
NotificationContext context) {
+ // Cluster config should be null because Helix pre-fetch is disabled
+ assert clusterConfig == null;
+
+ enqueueChange(ChangeType.CLUSTER_CONFIG);
+ }
+
@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
// Live instance list should be empty because Helix pre-fetch is disabled
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index dabb95867b..d05de53f3e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -21,14 +21,22 @@ package org.apache.pinot.broker.queryquota;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.collections4.SetUtils;
import org.apache.helix.AccessOption;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
@@ -37,6 +45,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -49,18 +58,30 @@ import org.slf4j.LoggerFactory;
/**
* This class is to support the qps quota feature.
- * It depends on the broker source change to update the dynamic rate limit,
- * which means it only gets updated when a new table added or a broker
restarted.
+ * It allows performing qps quota check at table level and database level
+ * For table level check it depends on the broker source change to update the
dynamic rate limit,
+ * which means it gets updated when a new table added or a broker restarted.
+ * For database level check it depends on the broker as well as cluster config
and database config change
+ * to update the dynamic rate limit, which means it gets updated when
+ * - the default query quota at cluster config is updated
+ * - the database config is updated
+ * - new table is assigned to the broker (rate limiter is created if not
present)
+ * - broker added or removed from cluster
*/
public class HelixExternalViewBasedQueryQuotaManager implements
ClusterChangeHandler, QueryQuotaManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
+ private static final Set<HelixConstants.ChangeType> CHANGE_TYPES_TO_PROCESS
= SetUtils.hashSet(
+ HelixConstants.ChangeType.EXTERNAL_VIEW,
HelixConstants.ChangeType.INSTANCE_CONFIG,
+ HelixConstants.ChangeType.CLUSTER_CONFIG);
private final BrokerMetrics _brokerMetrics;
private final String _instanceId;
private final AtomicInteger _lastKnownBrokerResourceVersion = new
AtomicInteger(-1);
private final Map<String, QueryQuotaEntity> _rateLimiterMap = new
ConcurrentHashMap<>();
+ private final Map<String, QueryQuotaEntity> _databaseRateLimiterMap = new
ConcurrentHashMap<>();
+ private double _defaultQpsQuotaForDatabase;
private HelixManager _helixManager;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -76,20 +97,22 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
Preconditions.checkState(_helixManager == null,
"HelixExternalViewBasedQueryQuotaManager is already initialized");
_helixManager = helixManager;
_propertyStore = _helixManager.getHelixPropertyStore();
+ _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase();
getQueryQuotaEnabledFlagFromInstanceConfig();
}
@Override
public void processClusterChange(HelixConstants.ChangeType changeType) {
- Preconditions.checkState(changeType ==
HelixConstants.ChangeType.EXTERNAL_VIEW
- || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal
change type: " + changeType);
+ Preconditions.checkState(CHANGE_TYPES_TO_PROCESS.contains(changeType),
"Illegal change type: " + changeType);
if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
ExternalView brokerResourceEV = HelixHelper
.getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
processQueryRateLimitingExternalViewChange(brokerResourceEV);
- } else {
+ } else if (changeType == HelixConstants.ChangeType.INSTANCE_CONFIG) {
processQueryRateLimitingInstanceConfigChange();
+ } else {
+ processQueryRateLimitingClusterConfigChange();
}
}
@@ -230,6 +253,118 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ // Caller method need not worry about getting lock on _databaseRateLimiterMap
+ // as this method will do idempotent updates to the database rate limiters
+ private synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (String databaseName : databaseNames) {
+ double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetDatabaseRateLimiter(databaseName);
+ continue;
+ }
+ int numOnlineBrokers = getNumOnlineBrokers(databaseName, brokerResource);
+ double perBrokerQpsQuota = databaseQpsQuota / numOnlineBrokers;
+ QueryQuotaEntity oldQueryQuotaEntity =
_databaseRateLimiterMap.get(databaseName);
+ if (oldQueryQuotaEntity == null) {
+ LOGGER.info("Adding new query rate limiter for database {} with rate
{}.", databaseName, perBrokerQpsQuota);
+ QueryQuotaEntity queryQuotaEntity = new
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+ new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+ numOnlineBrokers, databaseQpsQuota, -1);
+ _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+ continue;
+ }
+ boolean changeDetected = false;
+ double oldQuota = oldQueryQuotaEntity.getRateLimiter() != null ?
oldQueryQuotaEntity.getRateLimiter().getRate()
+ : -1;
+ if (oldQueryQuotaEntity.getOverallRate() != databaseQpsQuota) {
+ changeDetected = true;
+ LOGGER.info("Overall quota changed for the database from {} to {}",
oldQueryQuotaEntity.getOverallRate(),
+ databaseQpsQuota);
+ oldQueryQuotaEntity.setOverallRate(databaseQpsQuota);
+ }
+ if (oldQueryQuotaEntity.getNumOnlineBrokers() != numOnlineBrokers) {
+ changeDetected = true;
+ LOGGER.info("Number of online brokers changed for the database from {}
to {}",
+ oldQueryQuotaEntity.getNumOnlineBrokers(), numOnlineBrokers);
+ oldQueryQuotaEntity.setNumOnlineBrokers(numOnlineBrokers);
+ }
+ if (!changeDetected) {
+ LOGGER.info("No change detected with the query rate limiter for
database {}", databaseName);
+ continue;
+ }
+ LOGGER.info("Updating existing query rate limiter for database {} from
rate {} to {}", databaseName, oldQuota,
+ perBrokerQpsQuota);
+
oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+ }
+ }
+
+ // Pulling this logic to a separate placeholder method so that the quota
split logic
+ // can be enhanced further in isolation.
+ private int getNumOnlineBrokers(String databaseName, ExternalView
brokerResource) {
+ // Tables in database can span across broker tags as we don't maintain a
broker tag to database mapping as of now.
+ // Hence, we consider all online brokers for the rate distribution.
+ // TODO consider computing only the online brokers which serve the tables
under the database
+ return
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+ }
+
+ /**
+ * Utility to get the effective query quota being imposed on a database.
+ * It is computed based on the default quota set at cluster config and
override set at database config
+ * @param databaseName database name to get the query quota on.
+ * @return effective query quota limit being applied
+ */
+ private double getEffectiveQueryQuotaOnDatabase(String databaseName) {
+ DatabaseConfig databaseConfig =
+
ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(),
databaseName);
+ if (databaseConfig != null && databaseConfig.getQuotaConfig() != null
+ && databaseConfig.getQuotaConfig().getMaxQPS() != -1) {
+ return databaseConfig.getQuotaConfig().getMaxQPS();
+ }
+ return _defaultQpsQuotaForDatabase;
+ }
+
+ /**
+ * Creates a new database rate limiter. Will not update the database rate
limiter if it already exists.
+ * @param databaseName database name for which rate limiter needs to be
created
+ */
+ public void createDatabaseRateLimiter(String databaseName) {
+ if (_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ /**
+ * Build an empty rate limiter in the new query quota entity, or set the
rate limiter to null in an existing query
+ * quota entity.
+ */
+ private void buildEmptyOrResetDatabaseRateLimiter(String databaseName) {
+ QueryQuotaEntity queryQuotaEntity =
_databaseRateLimiterMap.get(databaseName);
+ if (queryQuotaEntity == null) {
+ // Create an QueryQuotaEntity object without setting a rate limiter.
+ queryQuotaEntity = new QueryQuotaEntity(null, new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
+ _databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
+ } else {
+ // Set rate limiter to null for an existing QueryQuotaEntity object.
+ queryQuotaEntity.setRateLimiter(null);
+ }
+ }
+
/**
* Build an empty rate limiter in the new query quota entity, or set the
rate limiter to null in an existing query
* quota entity.
@@ -279,6 +414,20 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
}
+ @Override
+ public boolean acquireDatabase(String databaseName) {
+ // Return true if query quota is disabled in the current broker.
+ if (isQueryRateLimitDisabled()) {
+ return true;
+ }
+ QueryQuotaEntity queryQuota = _databaseRateLimiterMap.get(databaseName);
+ if (queryQuota == null) {
+ return true;
+ }
+ LOGGER.debug("Trying to acquire token for database: {}", databaseName);
+ return tryAcquireToken(databaseName, queryQuota);
+ }
+
/**
* {@inheritDoc}
* <p>Acquires a token from rate limiter based on the table name.
@@ -291,7 +440,6 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
if (isQueryRateLimitDisabled()) {
return true;
}
- LOGGER.debug("Trying to acquire token for table: {}", tableName);
String offlineTableName = null;
String realtimeTableName = null;
QueryQuotaEntity offlineTableQueryQuotaEntity = null;
@@ -311,21 +459,27 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
realtimeTableQueryQuotaEntity = _rateLimiterMap.get(realtimeTableName);
}
- boolean offlineQuotaOk =
- offlineTableQueryQuotaEntity == null ||
tryAcquireToken(offlineTableName, offlineTableQueryQuotaEntity);
- boolean realtimeQuotaOk =
- realtimeTableQueryQuotaEntity == null ||
tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity);
+ boolean offlineQuotaOk = true;
+ if (offlineTableQueryQuotaEntity != null) {
+ LOGGER.debug("Trying to acquire token for table: {}", offlineTableName);
+ offlineQuotaOk = tryAcquireToken(offlineTableName,
offlineTableQueryQuotaEntity);
+ }
+ boolean realtimeQuotaOk = true;
+ if (realtimeTableQueryQuotaEntity != null) {
+ LOGGER.debug("Trying to acquire token for table: {}", realtimeTableName);
+ realtimeQuotaOk = tryAcquireToken(realtimeTableName,
realtimeTableQueryQuotaEntity);
+ }
return offlineQuotaOk && realtimeQuotaOk;
}
/**
* Try to acquire token from rate limiter. Emit the utilization of the qps
quota if broker metric isn't null.
- * @param tableNameWithType table name with type.
+ * @param resourceName resource name to acquire.
* @param queryQuotaEntity query quota entity for type-specific table.
* @return true if there's no qps quota for that table, or a token is
acquired successfully.
*/
- private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity
queryQuotaEntity) {
+ private boolean tryAcquireToken(String resourceName, QueryQuotaEntity
queryQuotaEntity) {
// Use hit counter to count the number of hits.
queryQuotaEntity.getQpsTracker().hit();
queryQuotaEntity.getMaxQpsTracker().hit();
@@ -340,7 +494,7 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
// Emit the qps capacity utilization rate.
int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
if (!rateLimiter.tryAcquire()) {
- LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}.
Current qps: {}", tableNameWithType,
+ LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate:
{}. Current qps: {}", resourceName,
perBrokerRate, numHits);
return false;
}
@@ -353,6 +507,11 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return _rateLimiterMap.size();
}
+ @VisibleForTesting
+ public Map<String, QueryQuotaEntity> getDatabaseRateLimiterMap() {
+ return _databaseRateLimiterMap;
+ }
+
@VisibleForTesting
public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) {
return _rateLimiterMap.get(tableNameWithType);
@@ -448,6 +607,19 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
numRebuilt++;
}
}
+
+ // handle EV change for database query quotas
+ int onlineBrokerCount =
HelixHelper.getOnlineInstanceFromExternalView(currentBrokerResourceEV).size();
+ for (Map.Entry<String, QueryQuotaEntity> it :
_databaseRateLimiterMap.entrySet()) {
+ QueryQuotaEntity quota = it.getValue();
+ if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
+ quota.setNumOnlineBrokers(onlineBrokerCount);
+ }
+ if (quota.getOverallRate() > 0) {
+ quota.setRateLimiter(RateLimiter.create(quota.getOverallRate() /
onlineBrokerCount));
+ }
+ }
+
if (isQueryRateLimitDisabled()) {
LOGGER.info("Query rate limiting is currently disabled for this broker.
So it won't take effect immediately.");
}
@@ -458,6 +630,27 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
numRebuilt, _rateLimiterMap.size());
}
+ /**
+ * Process query quota state change when cluster config gets changed
+ */
+ public void processQueryRateLimitingClusterConfigChange() {
+ double oldDatabaseQpsQuota = _defaultQpsQuotaForDatabase;
+ _defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase();
+ if (oldDatabaseQpsQuota == _defaultQpsQuotaForDatabase) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(new
ArrayList<>(_databaseRateLimiterMap.keySet()));
+ }
+
+ private double getDefaultQueryQuotaForDatabase() {
+ HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+ HelixConfigScope configScope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(_helixManager.getClusterName()).build();
+ return Double.parseDouble(helixAdmin.getConfig(configScope,
+
Collections.singletonList(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND))
+
.getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, "-1"));
+ }
+
/**
* Process query quota state change when instance config gets changed
*/
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
index 6fd335e4d3..57faef8778 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -26,4 +26,11 @@ public interface QueryQuotaManager {
* @return {@code true} if the table quota has not been reached, {@code
false} otherwise
*/
boolean acquire(String tableName);
+
+ /**
+ * Try to acquire a quota for the given database.
+ * @param databaseName database name
+ * @return {@code true} if the database quota has not been reached, {@code
false} otherwise
+ */
+ boolean acquireDatabase(String databaseName);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 6a839d5e36..dfac99bb82 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -506,6 +506,14 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
}
// Validate QPS quota
+ String database =
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
+ if (!_queryQuotaManager.acquireDatabase(database)) {
+ String errorMessage =
+ String.format("Request %d: %s exceeds query quota for database:
%s", requestId, query, database);
+ LOGGER.info(errorMessage);
+
requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+ return new
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
errorMessage));
+ }
if (!_queryQuotaManager.acquire(tableName)) {
String errorMessage =
String.format("Request %d: %s exceeds query quota for table: %s",
requestId, query, tableName);
@@ -531,7 +539,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
// Check if the query is a v2 supported query
- String database =
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(),
httpHeaders);
+ database =
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(),
httpHeaders);
// Attempt to add the query to the compile queue; drop if queue is full
if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
LOGGER.trace("Not compiling query `{}` using the multi-stage query
engine because the query queue is full",
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index c8f7c4c2f6..a1e82dbd53 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -129,10 +129,11 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
long compilationStartTimeNs = System.nanoTime();
long queryTimeoutMs;
QueryEnvironment.QueryPlannerResult queryPlanResult;
+ String database;
try {
Long timeoutMsFromQueryOption =
QueryOptionsUtils.getTimeoutMs(queryOptions);
queryTimeoutMs = timeoutMsFromQueryOption != null ?
timeoutMsFromQueryOption : _brokerTimeoutMs;
- String database =
DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
+ database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions,
httpHeaders);
QueryEnvironment queryEnvironment = new QueryEnvironment(database,
_tableCache, _workerManager);
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
@@ -204,7 +205,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
// Validate QPS quota
- if (hasExceededQPSQuota(tableNames, requestContext)) {
+ if (hasExceededQPSQuota(database, tableNames, requestContext)) {
String errorMessage = String.format("Request %d: %s exceeds query
quota.", requestId, query);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
errorMessage));
}
@@ -327,7 +328,13 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
/**
* Returns true if the QPS quota of the tables has exceeded.
*/
- private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext
requestContext) {
+ private boolean hasExceededQPSQuota(@Nullable String database, Set<String>
tableNames,
+ RequestContext requestContext) {
+ if (database != null && !_queryQuotaManager.acquireDatabase(database)) {
+ LOGGER.warn("Request {}: query exceeds quota for database: {}",
requestContext.getRequestId(), database);
+ requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+ return true;
+ }
for (String tableName : tableNames) {
if (!_queryQuotaManager.acquire(tableName)) {
LOGGER.warn("Request {}: query exceeds quota for table: {}",
requestContext.getRequestId(), tableName);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index a22f8bdb57..760f3170f9 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.queryquota;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,7 +28,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -58,10 +59,17 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
private HelixManager _helixManager;
private HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
+ private static final Map<String, String> CLUSTER_CONFIG_MAP = new
HashMap<>();
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME +
"_REALTIME";
private static final String BROKER_INSTANCE_ID = "broker_instance_1";
+ private static final long TABLE_MAX_QPS = 25;
+ private static final String TABLE_MAX_QPS_STR =
String.valueOf(TABLE_MAX_QPS);
+ private static final long DATABASE_HIGH_QPS = 40;
+ private static final String DATABASE_HIGH_QPS_STR =
String.valueOf(DATABASE_HIGH_QPS);
+ private static final long DATABASE_LOW_QPS = 10;
+ private static final String DATABASE_LOW_QPS_STR =
String.valueOf(DATABASE_LOW_QPS);
@BeforeTest
public void beforeTest() {
@@ -82,7 +90,6 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
public class FakeHelixManager extends ZKHelixManager {
- private ZkHelixPropertyStore<ZNRecord> _propertyStore;
FakeHelixManager(String clusterName, String instanceName, InstanceType
instanceType, String zkAddress) {
super(clusterName, instanceName, instanceType, zkAddress);
@@ -90,13 +97,6 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ZkClient.DEFAULT_SESSION_TIMEOUT,
ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
_zkclient.deleteRecursively("/" + clusterName + "/PROPERTYSTORE");
_zkclient.createPersistent("/" + clusterName + "/PROPERTYSTORE", true);
- setPropertyStore(clusterName);
- }
-
- void setPropertyStore(String clusterName) {
- _propertyStore =
- new ZkHelixPropertyStore<>(new
ZkBaseDataAccessor<ZNRecord>(_zkclient), "/" + clusterName + "/PROPERTYSTORE",
- null);
}
void closeZkClient() {
@@ -119,8 +119,16 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
@Override
public Map<String, String> getConfig(HelixConfigScope scope, List<String>
keys) {
+ if
(scope.getType().equals(HelixConfigScope.ConfigScopeProperty.CLUSTER)) {
+ return CLUSTER_CONFIG_MAP;
+ }
return _instanceConfigMap;
}
+
+ @Override
+ public ExternalView getResourceExternalView(String clusterName, String
resourceName) {
+ return generateBrokerResource(OFFLINE_TABLE_NAME);
+ }
}
@AfterMethod
@@ -129,8 +137,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
_testPropertyStore.reset();
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore,
OFFLINE_TABLE_NAME);
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore,
REALTIME_TABLE_NAME);
+ ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore,
CommonConstants.DEFAULT_DATABASE);
+ CLUSTER_CONFIG_MAP.clear();
}
_queryQuotaManager.cleanUpRateLimiterMap();
+ _queryQuotaManager.getDatabaseRateLimiterMap().clear();
}
@AfterTest
@@ -152,17 +163,129 @@ public class HelixExternalViewBasedQueryQuotaManagerTest
{
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
// All the request should be passed.
- runQueries(70, 10);
+ runQueries();
+
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void testOfflineTableNotnullQuotaWithHigherDefaultDatabaseQuota()
+ throws Exception {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ setQps(tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+ Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(),
1);
+
+ setDefaultDatabaseQps("40");
+ // qps withing table and default database qps quota
+ runQueries(25, false);
+ // qps exceeding table qps quota but withing default database quota
+ runQueries(40, true);
_queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
- public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig()
+ public void testOfflineTableNotnullQuotaWithLowerDefaultDatabaseQuota()
throws Exception {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ setQps(tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+ Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(),
1);
+
+ setDefaultDatabaseQps(DATABASE_LOW_QPS_STR);
+ // qps withing table and default database qps quota
+ runQueries(DATABASE_LOW_QPS, false);
+ // qps withing table qps quota but exceeding default database quota
+ runQueries(TABLE_MAX_QPS, true);
+
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void testOfflineTableNotnullQuotaWithHigherDatabaseQuota()
+ throws Exception {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ setQps(tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
+ DatabaseConfig databaseConfig = generateDefaultDatabaseConfig();
+ setHigherDatabaseQps(databaseConfig);
+ // qps withing table and database qps quota
+ runQueries(TABLE_MAX_QPS, false);
+ // qps exceeding table qps quota but within database quota
+ runQueries(DATABASE_HIGH_QPS, true);
+
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void testOfflineTableNotnullQuotaWithLowerDatabaseQuota()
+ throws Exception {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ setQps(tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+
+ DatabaseConfig databaseConfig = generateDefaultDatabaseConfig();
+ setLowerDatabaseQps(databaseConfig);
+ // qps withing table and database qps quota
+ runQueries(DATABASE_LOW_QPS, false);
+ // qps within table qps quota but exceeding database quota
+ runQueries(TABLE_MAX_QPS, true);
+
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void testCreateOrUpdateDatabaseRateLimiter() {
+ List<String> dbList = new ArrayList<>(2);
+ dbList.add("db1");
+ dbList.add("db2");
+ dbList.add("db3");
+ DatabaseConfig db1 = new DatabaseConfig(dbList.get(0), new
QuotaConfig(null, null));
+ DatabaseConfig db2 = new DatabaseConfig(dbList.get(1), new
QuotaConfig(null, "1"));
+ DatabaseConfig db3 = new DatabaseConfig(dbList.get(2), new
QuotaConfig(null, "2"));
+ ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
+ ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
+ ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db3);
+ dbList.forEach(db -> _queryQuotaManager.createDatabaseRateLimiter(db));
+ Map<String, QueryQuotaEntity> dbQuotaMap =
_queryQuotaManager.getDatabaseRateLimiterMap();
+ Assert.assertNull(dbQuotaMap.get(dbList.get(0)).getRateLimiter());
+
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(),
1);
+
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(),
2);
+ db1.setQuotaConfig(new QuotaConfig(null, "1"));
+ db2.setQuotaConfig(new QuotaConfig(null, "2"));
+ ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
+ ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
+ dbList.forEach(db -> _queryQuotaManager.updateDatabaseRateLimiter(db));
+
Assert.assertEquals(dbQuotaMap.get(dbList.get(0)).getRateLimiter().getRate(),
1);
+
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(),
2);
+
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(),
2);
+ }
+
+ @Test
+ public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig() {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity =
_queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
@@ -194,7 +317,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
@Test
public void
testOfflineTableWithNullQuotaButWithRealtimeTableConfigNotNullQpsConfig()
throws Exception {
- QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
+ QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR);
TableConfig realtimeTableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
@@ -220,7 +343,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
brokerResource.setState(REALTIME_TABLE_NAME, BROKER_INSTANCE_ID, "ONLINE");
brokerResource.setState(REALTIME_TABLE_NAME, "broker_instance_2",
"OFFLINE");
- QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
+ QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR);
TableConfig realtimeTableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
@@ -241,7 +364,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2);
// Rate limiter generates 1 token every 10 milliseconds, have to make it
sleep for a while.
- runQueries(70, 10L);
+ runQueries();
_queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
// Since real-time table still has the qps quota, the size of the hash map
becomes 1.
@@ -262,7 +385,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
- runQueries(70, 10L);
+ runQueries();
_queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
@@ -278,7 +401,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
- runQueries(70, 10L);
+ runQueries();
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore,
REALTIME_TABLE_NAME);
_queryQuotaManager.processQueryRateLimitingExternalViewChange(brokerResource);
@@ -315,9 +438,8 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
@Test
- public void
testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsConfig()
- throws Exception {
- QuotaConfig quotaConfig = new QuotaConfig("6G", "100.00");
+ public void
testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsConfig() {
+ QuotaConfig quotaConfig = new QuotaConfig("6G", TABLE_MAX_QPS_STR);
TableConfig offlineTableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
@@ -376,37 +498,65 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
return builder.build();
}
+ private DatabaseConfig generateDefaultDatabaseConfig() {
+ return new DatabaseConfig(CommonConstants.DEFAULT_DATABASE, null);
+ }
+
+ private void setLowerDatabaseQps(DatabaseConfig databaseConfig) {
+ setDatabaseQps(databaseConfig, DATABASE_LOW_QPS_STR);
+ }
+
+ private void setHigherDatabaseQps(DatabaseConfig databaseConfig) {
+ setDatabaseQps(databaseConfig, DATABASE_HIGH_QPS_STR);
+ }
+
+ private void setDefaultDatabaseQps(String maxQps) {
+ ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore,
CommonConstants.DEFAULT_DATABASE);
+
CLUSTER_CONFIG_MAP.put(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND,
maxQps);
+ _queryQuotaManager.processQueryRateLimitingClusterConfigChange();
+ }
+
+ private void setDatabaseQps(DatabaseConfig databaseConfig, String maxQps) {
+ QuotaConfig quotaConfig = new QuotaConfig(null, maxQps);
+ databaseConfig.setQuotaConfig(quotaConfig);
+ ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, databaseConfig);
+
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+ }
+
private void setQps(TableConfig tableConfig) {
- QuotaConfig quotaConfig = new QuotaConfig(null, "100.00");
+ QuotaConfig quotaConfig = new QuotaConfig(null, TABLE_MAX_QPS_STR);
tableConfig.setQuotaConfig(quotaConfig);
}
- private ExternalView generateBrokerResource(String tableName) {
+ private static ExternalView generateBrokerResource(String tableName) {
ExternalView brokerResource = new
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE");
brokerResource.setState(tableName, "broker_instance_2", "OFFLINE");
return brokerResource;
}
- private void runQueries(int numOfTimesToRun, long millis)
+ private void runQueries()
throws InterruptedException {
- int count = 0;
- for (int i = 0; i < numOfTimesToRun; i++) {
- Assert.assertTrue(_queryQuotaManager.acquire(RAW_TABLE_NAME));
- count++;
- Thread.sleep(millis);
- }
- Assert.assertEquals(count, numOfTimesToRun);
+ runQueries(TABLE_MAX_QPS, false);
+ //increase the qps and some of the queries should be throttled.
+ runQueries(TABLE_MAX_QPS * 2, true);
+ }
- //Reduce the time of sleeping and some of the queries should be throttled.
- count = 0;
- millis /= 2;
- for (int i = 0; i < numOfTimesToRun; i++) {
+ // try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
+ // is not comparable to sleepMillis, else the actual qps would end being lot
lower than required qps
+ private void runQueries(double qps, boolean shouldFail)
+ throws InterruptedException {
+ int failCount = 0;
+ long sleepMillis = (long) (1000 / qps);
+ for (int i = 0; i < qps; i++) {
+ if
(!_queryQuotaManager.acquireDatabase(CommonConstants.DEFAULT_DATABASE)) {
+ failCount++;
+ }
if (!_queryQuotaManager.acquire(RAW_TABLE_NAME)) {
- count++;
+ failCount++;
}
- Thread.sleep(millis);
+ Thread.sleep(sleepMillis);
}
- Assert.assertTrue(count > 0 && count < numOfTimesToRun);
+ Assert.assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
index f1a6dfe33f..7dfbdd75b9 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
@@ -172,6 +172,7 @@ public class BaseSingleStageBrokerRequestHandlerTest {
when(routingManager.getRoutingTable(any(),
Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
+ when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
long[] testRequestId = {-1};
BrokerMetrics.register(mock(BrokerMetrics.class));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java
new file mode 100644
index 0000000000..5511f26b27
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/DatabaseConfigRefreshMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request
is received to update the database config.
+ */
+public class DatabaseConfigRefreshMessage extends Message {
+ public static final String REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE =
"REFRESH_DATABASE_CONFIG";
+
+ private static final String DATABASE_NAME_KEY = "databaseName";
+
+ /**
+ * Constructor for the sender.
+ */
+ public DatabaseConfigRefreshMessage(String databaseName) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE);
+ // Give it infinite time to process the message, as long as session is
alive
+ setExecutionTimeout(-1);
+ // Set the Pinot specific fields
+ ZNRecord znRecord = getRecord();
+ znRecord.setSimpleField(DATABASE_NAME_KEY, databaseName);
+ }
+
+ /**
+ * Constructor for the receiver.
+ */
+ public DatabaseConfigRefreshMessage(Message message) {
+ super(message.getRecord());
+ if (!message.getMsgSubType().equals(REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE))
{
+ throw new IllegalArgumentException("Invalid message subtype:" +
message.getMsgSubType());
+ }
+ }
+
+ public String getDatabaseName() {
+ return getRecord().getSimpleField(DATABASE_NAME_KEY);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 0fdf94388a..faf9b6799c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.common.metadata;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -39,11 +41,14 @@ import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.spi.config.ConfigUtils;
+import org.apache.pinot.spi.config.DatabaseConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
@@ -62,6 +67,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX =
"/INSTANCE_PARTITIONS";
+ private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX =
"/CONFIGS/DATABASE";
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX =
"/CONFIGS/TABLE";
private static final String PROPERTYSTORE_USER_CONFIGS_PREFIX =
"/CONFIGS/USER";
private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX =
"/CONFIGS/INSTANCE";
@@ -73,6 +79,75 @@ public class ZKMetadataProvider {
propertyStore.set(constructPropertyStorePathForUserConfig(username),
znRecord, AccessOption.PERSISTENT);
}
+ /**
+ * Create database config, fail if exists.
+ *
+ * @return true if creation is successful.
+ */
+ public static boolean createDatabaseConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore,
+ DatabaseConfig databaseConfig) {
+ String databaseName = databaseConfig.getDatabaseName();
+ String databaseConfigPath =
constructPropertyStorePathForDatabaseConfig(databaseName);
+ ZNRecord databaseConfigZNRecord = toZNRecord(databaseConfig);
+ return propertyStore.create(databaseConfigPath, databaseConfigZNRecord,
AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Update database config.
+ *
+ * @return true if update is successful.
+ */
+ public static boolean setDatabaseConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, DatabaseConfig databaseConfig) {
+ String databaseName = databaseConfig.getDatabaseName();
+ ZNRecord databaseConfigZNRecord = toZNRecord(databaseConfig);
+ return
propertyStore.set(constructPropertyStorePathForDatabaseConfig(databaseName),
databaseConfigZNRecord,
+ -1, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * Remove database config.
+ */
+ @VisibleForTesting
+ public static void removeDatabaseConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String databaseName) {
+
propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName),
AccessOption.PERSISTENT);
+ }
+
+ private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) {
+ ZNRecord databaseConfigZNRecord = new
ZNRecord(databaseConfig.getDatabaseName());
+ Map<String, String> simpleFields = new HashMap<>();
+ simpleFields.put(DatabaseConfig.DATABASE_NAME_KEY,
databaseConfig.getDatabaseName());
+ QuotaConfig quotaConfig = databaseConfig.getQuotaConfig();
+ if (quotaConfig != null) {
+ simpleFields.put(DatabaseConfig.QUOTA_CONFIG_KEY,
quotaConfig.toJsonString());
+ }
+ databaseConfigZNRecord.setSimpleFields(simpleFields);
+ return databaseConfigZNRecord;
+ }
+
+ @Nullable
+ private static DatabaseConfig toDatabaseConfig(@Nullable ZNRecord znRecord) {
+ if (znRecord == null) {
+ return null;
+ }
+ try {
+ Map<String, String> simpleFields = znRecord.getSimpleFields();
+
+ // Mandatory fields
+ String databaseName = simpleFields.get(DatabaseConfig.DATABASE_NAME_KEY);
+
+ // Optional fields
+ QuotaConfig quotaConfig = null;
+ String quotaConfigString =
simpleFields.get(DatabaseConfig.QUOTA_CONFIG_KEY);
+ if (quotaConfigString != null) {
+ quotaConfig = JsonUtils.stringToObject(quotaConfigString,
QuotaConfig.class);
+ }
+ return new DatabaseConfig(databaseName, quotaConfig);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while creating database config from
ZNRecord: {}", znRecord.getId(), e);
+ return null;
+ }
+ }
+
@Deprecated
public static void setTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableNameWithType,
ZNRecord znRecord) {
@@ -177,6 +252,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType);
}
+ public static String constructPropertyStorePathForDatabaseConfig(String
resourceName) {
+ return StringUtil.join("/", PROPERTYSTORE_DATABASE_CONFIGS_PREFIX,
resourceName);
+ }
+
public static String constructPropertyStorePathForResourceConfig(String
resourceName) {
return StringUtil.join("/", PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
resourceName);
}
@@ -360,6 +439,12 @@ public class ZKMetadataProvider {
.collect(Collectors.toMap(UserConfig::getUsernameWithComponent, u ->
u));
}
+ @Nullable
+ public static DatabaseConfig
getDatabaseConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String
databaseName) {
+ return
toDatabaseConfig(propertyStore.get(constructPropertyStorePathForDatabaseConfig(databaseName),
null,
+ AccessOption.PERSISTENT));
+ }
+
@Nullable
public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableNameWithType) {
return
toTableConfig(propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType),
null,
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
index 809592bbc7..3691e0063c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
@@ -155,4 +155,18 @@ public class DatabaseUtils {
String database = databaseFromHeaders != null ? databaseFromHeaders :
databaseFromOptions;
return Objects.requireNonNullElse(database,
CommonConstants.DEFAULT_DATABASE);
}
+
+ /**
+ * Extract the database name from the prefix of fully qualified table name.
+ * If no prefix is present "default" database is returned
+ */
+ public static String extractDatabaseFromFullyQualifiedTableName(String
fullyQualifiedTableName) {
+ String[] split = StringUtils.split(fullyQualifiedTableName, '.');
+ return split.length == 1 ? CommonConstants.DEFAULT_DATABASE : split[0];
+ }
+
+ public static String extractDatabaseFromHttpHeaders(HttpHeaders headers) {
+ String databaseName = headers.getHeaderString(CommonConstants.DATABASE);
+ return databaseName == null ? CommonConstants.DEFAULT_DATABASE :
databaseName;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
index 9fd4584207..eecf2d0778 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
@@ -27,21 +27,34 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
+import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.config.DatabaseConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +63,14 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
@Api(tags = Constants.DATABASE_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
-@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
- HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
- description = "The format of the key is ```\"Basic <token>\" or \"Bearer
<token>\"```")))
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
+ @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+ key = SWAGGER_AUTHORIZATION_KEY,
+ description = "The format of the key is ```\"Basic <token>\" or
\"Bearer <token>\"```"),
+ @ApiKeyAuthDefinition(name = CommonConstants.DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+ key = CommonConstants.DATABASE,
+ description = "Database context passed through http header. If no
context is provided 'default' database "
+ + "context will be considered.")}))
@Path("/")
public class PinotDatabaseRestletResource {
public static final Logger LOGGER =
LoggerFactory.getLogger(PinotDatabaseRestletResource.class);
@@ -108,6 +126,67 @@ public class PinotDatabaseRestletResource {
}
return new DeleteDatabaseResponse(deletedTables, failedTables, dryRun);
}
+
+ /**
+ * API to update the quota configs for database
+ * If database config is not present it will be created implicitly
+ */
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/databases/{databaseName}/quotas")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_DATABASE_QUOTA)
+ @ApiOperation(value = "Update database quotas", notes = "Update database
quotas")
+ public SuccessResponse addTable(
+ @PathParam("databaseName") String databaseName,
@QueryParam("maxQueriesPerSecond") String queryQuota,
+ @Context HttpHeaders httpHeaders) {
+ if
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
{
+ throw new ControllerApplicationException(LOGGER, "Database config name
and request context does not match",
+ Response.Status.BAD_REQUEST);
+ }
+ try {
+ DatabaseConfig databaseConfig =
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+ QuotaConfig quotaConfig = new QuotaConfig(null, queryQuota);
+ if (databaseConfig == null) {
+ databaseConfig = new DatabaseConfig(databaseName, quotaConfig);
+ _pinotHelixResourceManager.addDatabaseConfig(databaseConfig);
+ } else {
+ databaseConfig.setQuotaConfig(quotaConfig);
+ _pinotHelixResourceManager.updateDatabaseConfig(databaseConfig);
+ }
+ return new SuccessResponse("Database quotas for database config " +
databaseName + " successfully updated");
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * API to get database quota configs.
+ * Will return null if database config is not defined or database quotas are
not defined
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/databases/{databaseName}/quotas")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_DATABASE_QUOTA)
+ @ApiOperation(value = "Get database quota configs", notes = "Get database
quota configs")
+ public QuotaConfig getDatabaseQuota(
+ @PathParam("databaseName") String databaseName, @Context HttpHeaders
httpHeaders) {
+ if
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
{
+ throw new ControllerApplicationException(LOGGER, "Database config name
and request context does not match",
+ Response.Status.BAD_REQUEST);
+ }
+ DatabaseConfig databaseConfig =
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+ if (databaseConfig != null) {
+ return databaseConfig.getQuotaConfig();
+ }
+ HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
+ HelixConfigScope configScope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+ String defaultQueryQuota = helixAdmin.getConfig(configScope,
+
Collections.singletonList(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND))
+
.getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, null);
+ return new QuotaConfig(null, defaultQueryQuota);
+ }
}
class DeleteDatabaseResponse {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2b835faaae..546f4c0105 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -102,6 +102,7 @@ import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
+import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -154,6 +155,7 @@ import
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObs
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStats;
@@ -1639,6 +1641,29 @@ public class PinotHelixResourceManager {
LOGGER.info("Successfully add user:{}", usernamePrefix);
}
+ /**
+ * Creates database config and sends out a database config refresh message.
+ * @param databaseConfig database config to be created
+ */
+ public void addDatabaseConfig(DatabaseConfig databaseConfig) {
+ if (!ZKMetadataProvider.createDatabaseConfig(_propertyStore,
databaseConfig)) {
+ throw new RuntimeException("Failed to create database config for
database: " + databaseConfig.getDatabaseName());
+ }
+ sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
+ }
+
+ /**
+ * Updates database config and sends out a database config refresh message.
+ * @param databaseConfig database config to be created
+ */
+ public void updateDatabaseConfig(DatabaseConfig databaseConfig) {
+ if (!ZKMetadataProvider.setDatabaseConfig(_propertyStore, databaseConfig))
{
+ throw new RuntimeException(
+ "Failed to update database config in Zookeeper for database: " +
databaseConfig.getDatabaseName());
+ }
+ sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
+ }
+
/**
* Performs validations of table config and adds the table to zookeeper
* @throws InvalidTableConfigException if validations fail
@@ -2755,6 +2780,26 @@ public class PinotHelixResourceManager {
}
}
+ private void sendDatabaseConfigRefreshMessage(String databaseName) {
+ DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new
DatabaseConfigRefreshMessage(databaseName);
+
+ // Send database config refresh message to brokers
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+ recipientCriteria.setSessionSpecific(true);
+ // Send message with no callback and infinite timeout on the recipient
+ int numMessagesSent =
+ _helixZkManager.getMessagingService().send(recipientCriteria,
databaseConfigRefreshMessage, null, -1);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} database config refresh messages to brokers for
database: {}", numMessagesSent,
+ databaseName);
+ } else {
+ LOGGER.warn("No database config refresh message sent to brokers for
database: {}", databaseName);
+ }
+ }
+
private void sendRoutingTableRebuildMessage(String tableNameWithType) {
RoutingTableRebuildMessage routingTableRebuildMessage = new
RoutingTableRebuildMessage(tableNameWithType);
@@ -2982,6 +3027,17 @@ public class PinotHelixResourceManager {
return _helixAdmin.getResourceExternalView(_helixClusterName,
tableNameWithType);
}
+ /**
+ * Get the database config for the given database name.
+ *
+ * @param databaseName database name
+ * @return Database config
+ */
+ @Nullable
+ public DatabaseConfig getDatabaseConfig(String databaseName) {
+ return ZKMetadataProvider.getDatabaseConfig(_propertyStore, databaseName);
+ }
+
/**
* Get the table config for the given table name with type suffix.
*
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 51cab16711..62f1b2e4d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -70,6 +70,7 @@ public class Actions {
public static final String GET_USER = "GetUser";
public static final String GET_VERSION = "GetVersion";
public static final String GET_ZNODE = "GetZnode";
+ public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota";
public static final String INGEST_FILE = "IngestFile";
public static final String RECOMMEND_CONFIG = "RecommendConfig";
public static final String RESET_SEGMENT = "ResetSegment";
@@ -86,6 +87,7 @@ public class Actions {
public static final String REBALANCE_TENANT_TABLES =
"RebalanceTenantTables";
public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval";
public static final String UPDATE_USER = "UpdateUser";
+ public static final String UPDATE_DATABASE_QUOTA = "UpdateDatabaseQuota";
public static final String UPDATE_ZNODE = "UpdateZnode";
public static final String UPLOAD_SEGMENT = "UploadSegment";
public static final String GET_INSTANCE_PARTITIONS =
"GetInstancePartitions";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
new file mode 100644
index 0000000000..d1fb956f2c
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.net.URI;
+import java.util.Properties;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
+import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.client.Connection.FAIL_ON_EXCEPTIONS;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * This test suite is focused only on validating that the config changes are
propagated properly as expected.
+ * Validations around different cases arising from cluster config, database
config and table config are extensively
+ * tested as part of {@link HelixExternalViewBasedQueryQuotaManagerTest}
+ */
+public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest {
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBrokers(1);
+ startServers(1);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
+ Properties properties = new Properties();
+ properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
+ _pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl()
+ "/" + getHelixClusterName(),
+ new
JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties())
+ .buildTransport());
+ }
+
+ @AfterMethod
+ void resetQuotas()
+ throws Exception {
+ addQueryQuotaToClusterConfig(null);
+ addQueryQuotaToDatabaseConfig(null);
+ addQueryQuotaToTableConfig(null);
+ }
+
+ @Test
+ public void testDefaultDatabaseQueryQuota()
+ throws Exception {
+ addQueryQuotaToClusterConfig(40);
+ testQueryRate(40);
+ }
+
+ @Test
+ public void testDatabaseConfigQueryQuota()
+ throws Exception {
+ addQueryQuotaToDatabaseConfig(10);
+ testQueryRate(10);
+ }
+
+ @Test
+ public void testDefaultDatabaseQueryQuotaOverride()
+ throws Exception {
+ addQueryQuotaToClusterConfig(25);
+ // override lower than default quota
+ addQueryQuotaToDatabaseConfig(10);
+ testQueryRate(10);
+ // override higher than default quota
+ addQueryQuotaToDatabaseConfig(40);
+ testQueryRate(40);
+ }
+
+ @Test
+ public void testDatabaseQueryQuotaWithTableQueryQuota()
+ throws Exception {
+ addQueryQuotaToDatabaseConfig(25);
+ // table quota within database quota. Queries should fail upon table quota
(10 qps) breach
+ addQueryQuotaToTableConfig(10);
+ testQueryRate(10);
+ // table quota more than database quota. Queries should fail upon database
quota (25 qps) breach
+ addQueryQuotaToTableConfig(50);
+ testQueryRate(25);
+ }
+
+ @Test
+ public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
+ throws Exception {
+ BaseBrokerStarter brokerStarter = null;
+ try {
+ addQueryQuotaToDatabaseConfig(25);
+ addQueryQuotaToTableConfig(10);
+ // Add one more broker such that quota gets distributed equally among
them
+ brokerStarter = startOneBroker(2);
+ // to allow change propagation to QueryQuotaManager
+ Thread.sleep(1000);
+ testQueryRate(10);
+ // drop table level quota so that database quota comes into effect
+ addQueryQuotaToTableConfig(null);
+ testQueryRate(25);
+ } finally {
+ if (brokerStarter != null) {
+ brokerStarter.stop();
+ }
+ }
+ }
+
+ /**
+ * Runs the query load with the max rate that the quota can allow and
ensures queries are not failing.
+ * Then runs the query load with double the max rate and expects queries to
fail due to quota breach.
+ * @param maxRate max rate allowed by the quota
+ */
+ void testQueryRate(int maxRate)
+ throws Exception {
+ runQueries(maxRate, false);
+ //increase the qps and some of the queries should be throttled.
+ runQueries(maxRate * 2, true);
+ }
+
+ // try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
+ // is not comparable to sleepMillis, else the actual qps would end up being
much lower than required qps
+ private void runQueries(double qps, boolean shouldFail)
+ throws Exception {
+ int failCount = 0;
+ long sleepMillis = (long) (1000 / qps);
+ for (int i = 0; i < qps * 2; i++) {
+ ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT
COUNT(*) FROM " + getTableName());
+ for (PinotClientException exception : resultSetGroup.getExceptions()) {
+ if (exception.getMessage().contains("QuotaExceededError")) {
+ failCount++;
+ break;
+ }
+ }
+ Thread.sleep(sleepMillis);
+ }
+ assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
+ }
+
+
+ public void addQueryQuotaToTableConfig(Integer maxQps)
+ throws Exception {
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null :
maxQps.toString()));
+ updateTableConfig(tableConfig);
+ // to allow change propagation to QueryQuotaManager
+ Thread.sleep(1000);
+ }
+
+ public void addQueryQuotaToDatabaseConfig(Integer maxQps)
+ throws Exception {
+ String url = _controllerRequestURLBuilder.getBaseUrl() +
"/databases/default/quotas";
+ if (maxQps != null) {
+ url += "?maxQueriesPerSecond=" + maxQps;
+ }
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new
URI(url), null, null));
+ // to allow change propagation to QueryQuotaManager
+ Thread.sleep(1000);
+ }
+
+ public void addQueryQuotaToClusterConfig(Integer maxQps)
+ throws Exception {
+ if (maxQps == null) {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new
URI(
+ _controllerRequestURLBuilder.forClusterConfigs() + "/"
+ + CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND)));
+ } else {
+ String payload = "{\"" +
CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND + "\":\"" + maxQps +
"\"}";
+ HttpClient.wrapAndThrowHttpException(
+ _httpClient.sendJsonPostRequest(new
URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
+ }
+ // to allow change propagation to QueryQuotaManager
+ Thread.sleep(1000);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java
new file mode 100644
index 0000000000..c615fa5488
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/DatabaseConfig.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+
+
+public class DatabaseConfig extends BaseJsonConfig {
+ public static final String QUOTA_CONFIG_KEY = "quota";
+ public static final String DATABASE_NAME_KEY = "databaseName";
+ private String _databaseName;
+ @JsonPropertyDescription("Resource quota associated with this database")
+ private QuotaConfig _quotaConfig;
+
+ public DatabaseConfig(String databaseName, QuotaConfig quotaConfig) {
+ _databaseName = databaseName;
+ _quotaConfig = quotaConfig;
+ }
+
+ public String getDatabaseName() {
+ return _databaseName;
+ }
+
+ public void setDatabaseName(String databaseName) {
+ _databaseName = databaseName;
+ }
+
+ @JsonProperty(QUOTA_CONFIG_KEY)
+ @Nullable
+ public QuotaConfig getQuotaConfig() {
+ return _quotaConfig;
+ }
+
+ public void setQuotaConfig(QuotaConfig quotaConfig) {
+ _quotaConfig = quotaConfig;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index a069031b69..6769ae1894 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -78,6 +78,7 @@ public class CommonConstants {
public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress";
public static final String QUERIES_DISABLED = "queriesDisabled";
public static final String QUERY_RATE_LIMIT_DISABLED =
"queryRateLimitDisabled";
+ public static final String DATABASE_MAX_QUERIES_PER_SECOND =
"databaseMaxQueriesPerSecond";
public static final String INSTANCE_CONNECTED_METRIC_NAME =
"helix.connected";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]