yashmayya commented on code in PR #13544:
URL: https://github.com/apache/pinot/pull/13544#discussion_r1677642007
##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -214,6 +216,15 @@ public void
addInstanceConfigChangeHandler(ClusterChangeHandler instanceConfigCh
_instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
}
+ /**
+ * Adds a cluster config change handler to handle Helix cluster config
change callbacks.
+ * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
+ * handlers from running. For slow change handler, make it asynchronous.
+ */
+ public void addClusterConfigChangeHandler(ClusterChangeHandler
clusterConfigChangeHandler) {
Review Comment:
Where is this method called?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java:
##########
@@ -129,6 +135,32 @@ public void onError(Exception e, ErrorCode code, ErrorType
type) {
}
}
+ private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
+ final String _databaseName;
+
+ RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage
databaseConfigRefreshMessage,
+ NotificationContext context) {
+ super(databaseConfigRefreshMessage, context);
+ _databaseName = databaseConfigRefreshMessage.getDatabaseName();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ // only update the existing rate limiter.
+ // Database rate limiter creation should only be done through table
based change triggers
+ _queryQuotaManager.updateDatabaseRateLimiter(_databaseName);
Review Comment:
Can this cause race conditions between table creation and database quota
config updates resulting in a stale quota config being used?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -331,6 +331,12 @@ private TableAuthorizationResult
hasTableAccess(RequesterIdentity requesterIdent
* Returns true if the QPS quota of the tables has exceeded.
*/
private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext
requestContext) {
+ String database =
DatabaseUtils.extractDatabaseFromTableName(tableNames.iterator().next());
Review Comment:
The `database` has already been extracted from the request when this method
is called - probably better to pass that in rather than re-extracting it from
the table name?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -448,6 +448,14 @@ protected BrokerResponse handleRequest(long requestId,
String query, @Nullable S
}
// Validate QPS quota
+ String database = DatabaseUtils.extractDatabaseFromTableName(tableName);
+ if (!_queryQuotaManager.acquireDatabase(database)) {
+ String errorMessage =
+ String.format("Request %d: %s exceeds query quota for database:
%s", requestId, query, database);
+ LOGGER.info(errorMessage);
Review Comment:
nit: can use parameterized logging instead
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -230,6 +248,72 @@ private void createOrUpdateRateLimiter(String
tableNameWithType, ExternalView br
}
}
+ /**
+ * Updates the database rate limiter if it already exists. Will not create a
new database rate limiter.
+ * @param databaseName database name for which rate limiter needs to be
updated
+ */
+ public void updateDatabaseRateLimiter(String databaseName) {
+ if (!_databaseRateLimiterMap.containsKey(databaseName)) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
+ }
+
+ private void createOrUpdateDatabaseRateLimiter(List<String> databaseNames) {
+ double databaseQpsQuota = _defaultQpsQuotaForDatabase;
+ ExternalView brokerResource = HelixHelper
+ .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
+ CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ // Tables in database can span across broker tags as we don't maintain a
broker tag to database mapping as of now.
+ // Hence, we consider all online brokers for the rate distribution.
+ int onlineBrokers =
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+ for (String databaseName : databaseNames) {
+ DatabaseConfig databaseConfig =
+
ZKMetadataProvider.getDatabaseConfig(_helixManager.getHelixPropertyStore(),
databaseName);
+ if (databaseConfig != null && databaseConfig.getQuotaConfig() != null
+ && databaseConfig.getQuotaConfig().getMaxQPS() != -1) {
+ databaseQpsQuota = databaseConfig.getQuotaConfig().getMaxQPS();
+ }
+ if (databaseQpsQuota < 0) {
+ buildEmptyOrResetRateLimiterInDatabaseQueryQuotaEntity(databaseName);
+ continue;
+ }
+ double perBrokerQpsQuota = databaseQpsQuota / onlineBrokers;
Review Comment:
I think this could lead to extremely skewed scenarios right? Consider a
large cluster with let's say 10 brokers and there's a database that has a
single table with a broker tenant tag that only applies to a single broker. In
this case, the single broker will get the entire table's query quota but only
10% of the database query quota which might very well be less than the
individual table level query quota?
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -155,4 +157,14 @@ public static String extractDatabaseFromQueryRequest(
String database = databaseFromHeaders != null ? databaseFromHeaders :
databaseFromOptions;
return Objects.requireNonNullElse(database,
CommonConstants.DEFAULT_DATABASE);
}
+
+ public static String extractDatabaseFromTableName(String tableName) {
Review Comment:
```suggestion
public static String extractDatabaseFromFullyQualifiedTableName(String
tableName) {
```
nit: to make it more clear. Also could you please add a small Javadoc here?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -340,7 +435,7 @@ private boolean tryAcquireToken(String tableNameWithType,
QueryQuotaEntity query
// Emit the qps capacity utilization rate.
int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
if (!rateLimiter.tryAcquire()) {
- LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}.
Current qps: {}", tableNameWithType,
+ LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate:
{}. Current qps: {}", resourceName,
Review Comment:
I think it probably makes more sense to separate the log lines for table and
database query quota limits to make it more clear to users whether the table or
the database query quota has been exceeded?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -458,6 +566,27 @@ public void
processQueryRateLimitingExternalViewChange(ExternalView currentBroke
numRebuilt, _rateLimiterMap.size());
}
+ /**
+ * Process query quota state change when cluster config gets changed
+ */
+ public void processQueryRateLimitingClusterConfigChange() {
+ double oldDatabaseQpsQuota = _defaultQpsQuotaForDatabase;
+ getDefaultQueryQuotaForDatabase();
+ if (oldDatabaseQpsQuota == _defaultQpsQuotaForDatabase) {
+ return;
+ }
+ createOrUpdateDatabaseRateLimiter(new
ArrayList<>(_databaseRateLimiterMap.keySet()));
+ }
+
+ private void getDefaultQueryQuotaForDatabase() {
Review Comment:
IMO this part is not as explicit / unambiguous to read as it could be - WDYT
about making `getDefaultQueryQuotaForDatabase` return the default value and
setting `_defaultQpsQuotaForDatabase` at the method's call-sites rather than
within the method itself?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -340,7 +435,7 @@ private boolean tryAcquireToken(String tableNameWithType,
QueryQuotaEntity query
// Emit the qps capacity utilization rate.
int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
if (!rateLimiter.tryAcquire()) {
- LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}.
Current qps: {}", tableNameWithType,
+ LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate:
{}. Current qps: {}", resourceName,
Review Comment:
Ah never mind, just noticed that we do that in the broker request handler
anyway, so this isn't too important here.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java:
##########
@@ -46,6 +55,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.pinot.spi.utils.CommonConstants.Helix.DATABASE_QUERY_RATE_LIMIT;
+
/**
* This class is to support the qps quota feature.
Review Comment:
nit: this Javadoc should probably be updated - currently it states that `it
only gets updated when a new table added or a broker restarted`.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java:
##########
@@ -107,6 +120,61 @@ public DeleteDatabaseResponse deleteTablesInDatabase(
}
return new DeleteDatabaseResponse(deletedTables, failedTables, dryRun);
}
+
+ /**
+ * API to update the quota configs for database
+ * If database config is not present it will be created implicitly
+ */
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/databases/{databaseName}/quotas")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_DATABASE_QUOTA)
+ @ApiOperation(value = "Update database quotas", notes = "Update database
quotas")
+ public SuccessResponse addTable(
+ @PathParam("databaseName") String databaseName,
@QueryParam("maxQueriesPerSecond") String queryQuota,
+ @Context HttpHeaders httpHeaders) {
+ if
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
{
+ throw new ControllerApplicationException(LOGGER, "Database config name
and request context does not match",
+ Response.Status.BAD_REQUEST);
+ }
+ try {
+ DatabaseConfig databaseConfig =
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+ QuotaConfig quotaConfig = new QuotaConfig(null, queryQuota);
+ if (databaseConfig == null) {
+ databaseConfig = new DatabaseConfig(databaseName, quotaConfig);
+ _pinotHelixResourceManager.addDatabaseConfig(databaseConfig);
+ } else {
+ databaseConfig.setQuotaConfig(quotaConfig);
+ _pinotHelixResourceManager.updateDatabaseConfig(databaseConfig);
+ }
+ return new SuccessResponse("Database quotas for database config " +
databaseName + " successfully updated");
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * API to get database quota configs.
+ * Will return null if database config is not defined or database quotas are
not defined
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/databases/{databaseName}/quotas")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_DATABASE_QUOTA)
+ @ApiOperation(value = "Get database quota configs", notes = "Get database
quota configs")
+ public QuotaConfig getDatabaseQuota(
+ @PathParam("databaseName") String databaseName, @Context HttpHeaders
httpHeaders) {
+ if
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
{
+ throw new ControllerApplicationException(LOGGER, "Database config name
and request context does not match",
+ Response.Status.BAD_REQUEST);
+ }
+ DatabaseConfig databaseConfig =
_pinotHelixResourceManager.getDatabaseConfig(databaseName);
+ if (databaseConfig != null) {
+ return databaseConfig.getQuotaConfig();
+ }
+ return new QuotaConfig(null, null);
Review Comment:
Should we return the default database query quota here instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]