This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d6732ffd19 Logical table query quota enforcement - SSE (#15839)
d6732ffd19 is described below
commit d6732ffd1943a6a11358ad36645e07769e1c84ff
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue May 27 12:57:28 2025 +0530
Logical table query quota enforcement - SSE (#15839)
---
...okerResourceOnlineOfflineStateModelFactory.java | 1 +
.../BrokerUserDefinedMessageHandlerFactory.java | 1 +
.../HelixExternalViewBasedQueryQuotaManager.java | 117 +++++++++++++--------
.../pinot/broker/queryquota/QueryQuotaManager.java | 7 ++
.../requesthandler/BaseBrokerRequestHandler.java | 3 +-
.../BaseSingleStageBrokerRequestHandler.java | 23 ++--
.../pinot/common/metadata/ZKMetadataProvider.java | 5 +-
.../controller/helix/ControllerRequestClient.java | 23 ++++
.../helix/core/PinotHelixResourceManager.java | 9 +-
.../pinot/controller/helix/ControllerTest.java | 12 ++-
.../tests/QueryQuotaClusterIntegrationTest.java | 94 ++++++++++++++++-
.../BaseLogicalTableIntegrationTest.java | 34 +++---
...hTwoOfflineOneRealtimeTableIntegrationTest.java | 2 +-
13 files changed, 241 insertions(+), 90 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index cc2f2f406a..d9e83d4e4c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -80,6 +80,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory
extends StateModelFact
try {
if (ZKMetadataProvider.isLogicalTableExists(_propertyStore,
physicalOrLogicalTable)) {
_routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable);
+
_queryQuotaManager.initOrUpdateLogicalTableQueryQuota(physicalOrLogicalTable);
} else {
_routingManager.buildRouting(physicalOrLogicalTable);
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 033a126ea6..81ea3d0d4f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -154,6 +154,7 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
@Override
public HelixTaskResult handleMessage() {
_routingManager.buildRoutingForLogicalTable(_logicalTableName);
+ _queryQuotaManager.initOrUpdateLogicalTableQueryQuota(_logicalTableName);
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 925fbc4860..7bb8641271 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
@@ -175,6 +176,20 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
}
+ public void initOrUpdateLogicalTableQueryQuota(String logicalTableName) {
+ LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+ if (logicalTableConfig == null) {
+ LOGGER.info("No query quota to update since logical table config is
null");
+ return;
+ }
+
+ LOGGER.info("Initializing rate limiter for logical table {}",
logicalTableName);
+
+ ExternalView brokerResourceEV = getBrokerResource();
+ Stat stat =
_propertyStore.getStat(constructLogicalTableConfigPath(logicalTableName),
AccessOption.PERSISTENT);
+ createOrUpdateRateLimiter(logicalTableName, brokerResourceEV,
logicalTableConfig.getQuotaConfig(), stat);
+ }
+
public void initOrUpdateTableQueryQuota(String tableNameWithType) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
ExternalView brokerResourceEV = getBrokerResource();
@@ -195,24 +210,25 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);
// Create rate limiter if query quota config is specified.
- createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV,
tableConfig.getQuotaConfig());
+ Stat stat =
_propertyStore.getStat(constructTableConfigPath(tableNameWithType),
AccessOption.PERSISTENT);
+ createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV,
tableConfig.getQuotaConfig(), stat);
}
/**
* Drop table query quota.
- * @param tableNameWithType table name with type.
+ * @param physicalOrLogicalTable physical or logical table name.
*/
- public void dropTableQueryQuota(String tableNameWithType) {
- LOGGER.info("Dropping rate limiter for table {}", tableNameWithType);
- removeRateLimiter(tableNameWithType);
+ public void dropTableQueryQuota(String physicalOrLogicalTable) {
+ LOGGER.info("Dropping rate limiter for table {}", physicalOrLogicalTable);
+ removeRateLimiter(physicalOrLogicalTable);
}
/** Remove or update rate limiter if another table with the same raw table
name but different type is still using
* the quota config.
- * @param tableNameWithType table name with type
+ * @param physicalOrLogicalTable physical or logical table name.
*/
- private void removeRateLimiter(String tableNameWithType) {
- _rateLimiterMap.remove(tableNameWithType);
+ private void removeRateLimiter(String physicalOrLogicalTable) {
+ _rateLimiterMap.remove(physicalOrLogicalTable);
}
/**
@@ -230,26 +246,27 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
/**
* Create or update a rate limiter for a table.
- * @param tableNameWithType table name with table type.
+ * @param physicalOrLogicalTableName physical or logical table name.
* @param brokerResource broker resource which stores all the broker states
of each table.
* @param quotaConfig quota config of the table.
+ * @param tableStat stat of the table config.
*/
- private void createOrUpdateRateLimiter(String tableNameWithType,
ExternalView brokerResource,
- QuotaConfig quotaConfig) {
+ private void createOrUpdateRateLimiter(String physicalOrLogicalTableName,
ExternalView brokerResource,
+ QuotaConfig quotaConfig, Stat tableStat) {
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
- LOGGER.info("No qps config specified for table: {}", tableNameWithType);
- buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
+ LOGGER.info("No qps config specified for table: {}",
physicalOrLogicalTableName);
+
buildEmptyOrResetRateLimiterInQueryQuotaEntity(physicalOrLogicalTableName);
return;
}
if (brokerResource == null) {
- LOGGER.warn("Failed to init qps quota for table {}. No broker resource
connected!", tableNameWithType);
+ LOGGER.warn("Failed to init qps quota for table {}. No broker resource
connected!", physicalOrLogicalTableName);
// It could be possible that brokerResourceEV is null due to ZK
connection issue.
// In this case, the rate limiter should not be reset. Simply exit the
method would be sufficient.
return;
}
- Map<String, String> stateMap =
brokerResource.getStateMap(tableNameWithType);
+ Map<String, String> stateMap =
brokerResource.getStateMap(physicalOrLogicalTableName);
int otherOnlineBrokerCount = 0;
// If stateMap is null, that means this broker is the first broker for
this table.
@@ -263,26 +280,22 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
int onlineCount = otherOnlineBrokerCount + 1;
- LOGGER.info("The number of online brokers for table {} is {}",
tableNameWithType, onlineCount);
+ LOGGER.info("The number of online brokers for table {} is {}",
physicalOrLogicalTableName, onlineCount);
// Get the dynamic rate
double overallRate = quotaConfig.getMaxQPS();
-
- // Get stat from property store
- String tableConfigPath = constructTableConfigPath(tableNameWithType);
- Stat stat = _propertyStore.getStat(tableConfigPath,
AccessOption.PERSISTENT);
double perBrokerRate = overallRate / onlineCount;
- QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+ QueryQuotaEntity queryQuotaEntity =
_rateLimiterMap.get(physicalOrLogicalTableName);
if (queryQuotaEntity == null) {
queryQuotaEntity =
new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
- new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
onlineCount, overallRate, stat.getVersion());
- _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
onlineCount, overallRate, tableStat.getVersion());
+ _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity);
LOGGER.info(
"Rate limiter for table: {} has been initialized. Overall rate: {}.
Per-broker rate: {}. Number of online "
- + "broker instances: {}. Table config stat version: {}",
tableNameWithType, overallRate, perBrokerRate,
- onlineCount, stat.getVersion());
+ + "broker instances: {}. Table config stat version: {}",
physicalOrLogicalTableName, overallRate,
+ perBrokerRate, onlineCount, tableStat.getVersion());
} else {
RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
double previousRate = -1;
@@ -297,14 +310,14 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
queryQuotaEntity.setNumOnlineBrokers(onlineCount);
queryQuotaEntity.setOverallRate(overallRate);
- queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
+ queryQuotaEntity.setTableConfigStatVersion(tableStat.getVersion());
LOGGER.info(
"Rate limiter for table: {} has been updated. Overall rate: {}.
Previous per-broker rate: {}. New "
+ "per-broker rate: {}. Number of online broker instances: {}.
Table config stat version: {}",
- tableNameWithType, overallRate, previousRate, perBrokerRate,
onlineCount, stat.getVersion());
+ physicalOrLogicalTableName, overallRate, previousRate,
perBrokerRate, onlineCount, tableStat.getVersion());
}
- addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType,
queryQuotaEntity);
- addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType,
queryQuotaEntity);
+ addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName,
queryQuotaEntity);
+
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName,
queryQuotaEntity);
if (isQueryRateLimitDisabled()) {
LOGGER.info("Query rate limiting is currently disabled for this broker.
So it won't take effect immediately.");
}
@@ -536,19 +549,19 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
* Build an empty rate limiter in the new query quota entity, or set the
rate limiter to null in an existing query
* quota entity.
*/
- private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String
tableNameWithType) {
- QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+ private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String
physicalOrLogicalTableName) {
+ QueryQuotaEntity queryQuotaEntity =
_rateLimiterMap.get(physicalOrLogicalTableName);
if (queryQuotaEntity == null) {
// Create an QueryQuotaEntity object without setting a rate limiter.
queryQuotaEntity = new QueryQuotaEntity(null, new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
- _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+ _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity);
} else {
// Set rate limiter to null for an existing QueryQuotaEntity object.
queryQuotaEntity.setRateLimiter(null);
}
- addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType,
queryQuotaEntity);
- addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType,
queryQuotaEntity);
+ addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName,
queryQuotaEntity);
+
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName,
queryQuotaEntity);
}
/**
@@ -680,6 +693,16 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return offlineQuotaOk && realtimeQuotaOk;
}
+ @Override
+ public boolean acquireLogicalTable(String logicalTableName) {
+ QueryQuotaEntity logicalTableQueryQuotaEntity =
_rateLimiterMap.get(logicalTableName);
+ if (logicalTableQueryQuotaEntity != null) {
+ LOGGER.debug("Trying to acquire token for logical table: {}",
logicalTableName);
+ return tryAcquireToken(logicalTableName, logicalTableQueryQuotaEntity);
+ }
+ return true;
+ }
+
/**
* Try to acquire token from rate limiter. Emit the utilization of the qps
quota if broker metric isn't null.
* @param resourceName resource name to acquire.
@@ -749,7 +772,7 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
int numRebuilt = 0;
for (Iterator<Map.Entry<String, QueryQuotaEntity>> it =
_rateLimiterMap.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, QueryQuotaEntity> entry = it.next();
- String tableNameWithType = entry.getKey();
+ String physicalOrLogicalTableName = entry.getKey();
QueryQuotaEntity queryQuotaEntity = entry.getValue();
if (queryQuotaEntity.getRateLimiter() == null) {
// No rate limiter set, skip this table.
@@ -757,9 +780,9 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
// Get number of online brokers.
- Map<String, String> stateMap =
currentBrokerResourceEV.getStateMap(tableNameWithType);
+ Map<String, String> stateMap =
currentBrokerResourceEV.getStateMap(physicalOrLogicalTableName);
if (stateMap == null) {
- LOGGER.info("No broker resource for Table {}. Removing its rate
limit.", tableNameWithType);
+ LOGGER.info("No broker resource for Table {}. Removing its rate
limit.", physicalOrLogicalTableName);
it.remove();
continue;
}
@@ -773,10 +796,14 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
int onlineBrokerCount = otherOnlineBrokerCount + 1;
// Get stat from property store
- String tableConfigPath = constructTableConfigPath(tableNameWithType);
- Stat stat = _propertyStore.getStat(tableConfigPath,
AccessOption.PERSISTENT);
+ String physicalOrLogicalTableConfigPath =
+ ZKMetadataProvider.isTableConfigExists(_propertyStore,
physicalOrLogicalTableName)
+ ? constructTableConfigPath(physicalOrLogicalTableName)
+ : constructLogicalTableConfigPath(physicalOrLogicalTableName);
+ Stat stat = _propertyStore.getStat(physicalOrLogicalTableConfigPath,
AccessOption.PERSISTENT);
if (stat == null) {
- LOGGER.info("Table {} has been deleted from property store. Removing
its rate limit.", tableNameWithType);
+ LOGGER.info("Table {} has been deleted from property store. Removing
its rate limit.",
+ physicalOrLogicalTableName);
it.remove();
continue;
}
@@ -790,10 +817,10 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
double overallRate;
// Get latest quota config only if stat don't match.
if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) {
- QuotaConfig quotaConfig =
getQuotaConfigFromPropertyStore(tableNameWithType);
+ QuotaConfig quotaConfig =
getQuotaConfigFromPropertyStore(physicalOrLogicalTableName);
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() ==
null) {
LOGGER.info("No query quota config or the config is invalid for
Table {}. Removing its rate limit.",
- tableNameWithType);
+ physicalOrLogicalTableName);
it.remove();
continue;
}
@@ -810,7 +837,7 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
LOGGER.info("Rate limiter for table: {} has been updated. Overall
rate: {}. Previous per-broker rate: {}. New "
+ "per-broker rate: {}. Number of online broker instances: {}.
Table config stat version: {}.",
- tableNameWithType, overallRate, previousRate, latestRate,
onlineBrokerCount, stat.getVersion());
+ physicalOrLogicalTableName, overallRate, previousRate, latestRate,
onlineBrokerCount, stat.getVersion());
numRebuilt++;
}
}
@@ -931,4 +958,8 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
private String constructTableConfigPath(String tableNameWithType) {
return "/CONFIGS/TABLE/" + tableNameWithType;
}
+
+ private String constructLogicalTableConfigPath(String tableName) {
+ return "/LOGICAL/TABLE/" + tableName;
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
index 70c3ef7588..eb3de472d9 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -27,6 +27,13 @@ public interface QueryQuotaManager {
*/
boolean acquire(String tableName);
+ /**
+ * Try to acquire a quota for the given logical table.
+ * @param logicalTableName Logical table name
+ * @return {@code true} if the table quota has not been reached, {@code
false} otherwise
+ */
+ boolean acquireLogicalTable(String logicalTableName);
+
/**
* Try to acquire a quota for the given database.
* @param databaseName database name
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 2367334497..2a344876c3 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -285,7 +285,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
* @return true if the query was successfully cancelled, false otherwise.
*/
protected abstract boolean handleCancel(long queryId, int timeoutMs,
Executor executor,
- HttpClientConnectionManager connMgr, Map<String, Integer>
serverResponses) throws Exception;
+ HttpClientConnectionManager connMgr, Map<String, Integer>
serverResponses)
+ throws Exception;
protected static void augmentStatistics(RequestContext statistics,
BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 7901772362..ab013085cb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -366,8 +366,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
CompileResult compileResult =
- compileRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext, httpHeaders,
- accessControl);
+ compileRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext, httpHeaders,
+ accessControl);
if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
/*
@@ -406,8 +406,17 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
}
// Validate QPS
- if (hasExceededQPSQuota(database, physicalTableNames, requestContext)) {
- String errorMessage = String.format("Request %d: %s exceeds query
quota.", requestId, query);
+ if (!_queryQuotaManager.acquireDatabase(database)) {
+ String errorMessage =
+ String.format("Request %d: %s exceeds query quota for database:
%s", requestId, query, database);
+ LOGGER.info(errorMessage);
+ requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+ return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS,
errorMessage);
+ }
+ if (!_queryQuotaManager.acquireLogicalTable(tableName)) {
+ String errorMessage =
+ String.format("Request %d: %s exceeds query quota for table: %s.",
requestId, query, tableName);
+ requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS,
errorMessage);
}
@@ -815,9 +824,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
if (ParserUtils.canCompileWithMultiStageEngine(query, database,
_tableCache)) {
return new CompileResult(new
BrokerResponseNative(QueryErrorCode.SQL_PARSING,
"It seems that the query is only supported by the multi-stage
query engine, please retry the query "
- + "using "
- + "the multi-stage query engine "
- +
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"));
+ + "using "
+ + "the multi-stage query engine "
+ +
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"));
} else {
return new CompileResult(
new BrokerResponseNative(QueryErrorCode.SQL_PARSING,
e.getMessage()));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 029f5c3491..eff90fcf49 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -884,7 +884,8 @@ public class ZKMetadataProvider {
return
propertyStore.exists(constructPropertyStorePathForLogical(tableName),
AccessOption.PERSISTENT);
}
- public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableName) {
- return
propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName),
AccessOption.PERSISTENT);
+ public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableNameWithType) {
+ return
propertyStore.exists(constructPropertyStorePathForResourceConfig(tableNameWithType),
+ AccessOption.PERSISTENT);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index e619cbf008..7969d2aa86 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
@@ -137,6 +138,17 @@ public class ControllerRequestClient {
}
}
+ public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+ throws IOException {
+ try {
+ HttpClient.wrapAndThrowHttpException(
+ _httpClient.sendJsonPostRequest(new
URI(_controllerRequestURLBuilder.forLogicalTableCreate()),
+ logicalTableConfig.toJsonString(), _headers));
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public void updateTableConfig(TableConfig tableConfig)
throws IOException {
try {
@@ -148,6 +160,17 @@ public class ControllerRequestClient {
}
}
+ public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+ throws IOException {
+ try {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPutRequest(
+ new
URI(_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableConfig.getTableName())),
+ logicalTableConfig.toJsonString(), _headers));
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public void toggleTableState(String tableName, TableType type, boolean
enable)
throws IOException {
try {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 892e3fbc11..09bd904aee 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -180,7 +180,6 @@ import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -2152,13 +2151,7 @@ public class PinotHelixResourceManager {
updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
}
- TimeBoundaryConfig oldTimeBoundaryConfig =
oldLogicalTableConfig.getTimeBoundaryConfig();
- TimeBoundaryConfig newTimeBoundaryConfig =
logicalTableConfig.getTimeBoundaryConfig();
- // compare the old and new time boundary config and send message if they
are different
- if ((oldTimeBoundaryConfig != null &&
!oldTimeBoundaryConfig.equals(newTimeBoundaryConfig))
- || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) {
- sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName());
- }
+ sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName());
LOGGER.info("Updated logical table {}: Successfully updated table",
tableName);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index cfd0f3f6cc..cb04aed946 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -407,7 +407,7 @@ public class ControllerTest {
.setBrokerTenant(brokerTenant)
.setRefOfflineTableName(offlineTableName)
.setRefRealtimeTableName(realtimeTableName)
- .setQuotaConfig(new QuotaConfig(null, "999"))
+ .setQuotaConfig(new QuotaConfig(null, "99999"))
.setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L))
.setTimeBoundaryConfig(new TimeBoundaryConfig("min",
Map.of("includedTables", physicalTableNames)))
.setPhysicalTableConfigMap(physicalTableConfigMap);
@@ -752,11 +752,21 @@ public class ControllerTest {
getControllerRequestClient().addTableConfig(tableConfig);
}
+ public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+ throws IOException {
+ getControllerRequestClient().addLogicalTableConfig(logicalTableConfig);
+ }
+
public void updateTableConfig(TableConfig tableConfig)
throws IOException {
getControllerRequestClient().updateTableConfig(tableConfig);
}
+ public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+ throws IOException {
+ getControllerRequestClient().updateLogicalTableConfig(logicalTableConfig);
+ }
+
public void toggleTableState(String tableName, TableType type, boolean
enable)
throws IOException {
getControllerRequestClient().toggleTableState(tableName, type, enable);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index f3945bcf1f..f91a4a9148 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest;
@@ -35,10 +36,12 @@ import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -76,6 +79,12 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);
+ // Create and upload schema and logical table
+ schema.setSchemaName(getLogicalTableName());
+ addSchema(schema);
+ LogicalTableConfig logicalTableConfig = getLogicalTableConfig();
+ addLogicalTableConfig(logicalTableConfig);
+
Properties properties = new Properties();
properties.put(FAIL_ON_EXCEPTIONS, "FALSE");
_pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory()
@@ -96,9 +105,11 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
setQueryQuotaForApplication(null);
addQueryQuotaToDatabaseConfig(null);
addQueryQuotaToTableConfig(null);
+ addQueryQuotaToLogicalTableConfig(null);
_brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
verifyQuotaUpdate(0);
+ verifyQuotaUpdateWithTableName(0, getLogicalTableName());
}
@Test
@@ -258,6 +269,48 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
}
+ @Test
+ public void testLogicalTableQueryQuota()
+ throws Exception {
+ int maxQps = 10;
+ addQueryQuotaToLogicalTableConfig(maxQps);
+ verifyQuotaUpdateWithTableName(maxQps, getLogicalTableName());
+ runQueries(maxQps, false, "default", getLogicalTableName());
+ //increase the qps and some of the queries should be throttled.
+ runQueries(maxQps * 2, true, "default", getLogicalTableName());
+
+ // queries on broker
+ runQueriesOnBroker(maxQps, false, getLogicalTableName());
+ //increase the qps and some of the queries should be throttled.
+ runQueriesOnBroker(maxQps * 2, true, getLogicalTableName());
+ }
+
+ @Test
+ public void testLogicalTableWithDatabaseQueryQuota()
+ throws Exception {
+ int databaseMaxQps = 25;
+ int logicalTableMaxQps = 10;
+ addQueryQuotaToDatabaseConfig(databaseMaxQps);
+ addQueryQuotaToLogicalTableConfig(logicalTableMaxQps);
+ // table quota within database quota. Queries should fail upon table quota
(10 qps) breach
+ verifyQuotaUpdateWithTableName(logicalTableMaxQps, getLogicalTableName());
+ runQueries(logicalTableMaxQps, false, "default", getLogicalTableName());
+ // queries on broker
+ runQueriesOnBroker(logicalTableMaxQps, false, getLogicalTableName());
+
+ //increase the logical table qps.
+ logicalTableMaxQps = 50;
+ addQueryQuotaToLogicalTableConfig(logicalTableMaxQps);
+ verifyQuotaUpdateWithTableName(databaseMaxQps, getLogicalTableName());
+ runQueries(databaseMaxQps, false, "default", getLogicalTableName());
+ // broker queries
+ runQueriesOnBroker(databaseMaxQps, false, getLogicalTableName());
+
+ //increase the qps and some of the queries should be throttled.
+ runQueries(databaseMaxQps * 2, true, "default", getLogicalTableName());
+ runQueriesOnBroker(databaseMaxQps * 2, true, getLogicalTableName());
+ }
+
/**
* Runs the query load with the max rate that the quota can allow and
ensures queries are not failing.
* Then runs the query load with double the max rate and expects queries to
fail due to quota breach.
@@ -295,15 +348,18 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
private void runQueries(int qps, boolean shouldFail) {
runQueries(qps, shouldFail, "default");
}
+ private void runQueries(int qps, boolean shouldFail, String applicationName)
{
+ runQueries(qps, shouldFail, applicationName, getTableName());
+ }
// try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end up being
much lower than required qps
- private void runQueries(int qps, boolean shouldFail, String applicationName)
{
+ private void runQueries(int qps, boolean shouldFail, String applicationName,
String tableName) {
int failCount = 0;
boolean isLastFail = false;
long deadline = System.currentTimeMillis() + 1000;
- String query = "SET applicationName='" + applicationName + "'; SELECT
COUNT(*) FROM " + getTableName();
+ String query = "SET applicationName='" + applicationName + "'; SELECT
COUNT(*) FROM " + tableName;
for (int i = 0; i < qps; i++) {
sleep(deadline, qps - i);
@@ -333,11 +389,15 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
private static volatile String _quotaSource;
private void verifyQuotaUpdate(double quotaQps) {
+ verifyQuotaUpdateWithTableName(quotaQps, getTableName() + "_OFFLINE");
+ }
+
+ private void verifyQuotaUpdateWithTableName(double quotaQps, String
tableName) {
try {
TestUtils.waitForCondition(aVoid -> {
try {
double tableQuota = Double.parseDouble(sendGetRequest(
- "http://" + _brokerHostPort + "/debug/tables/queryQuota/" +
getTableName() + "_OFFLINE"));
+ "http://" + _brokerHostPort + "/debug/tables/queryQuota/" +
tableName));
double dbQuota = Double.parseDouble(
sendGetRequest("http://" + _brokerHostPort +
"/debug/databases/queryQuota/default"));
double appQuota = Double.parseDouble(
@@ -363,7 +423,7 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
} catch (IOException e) {
throw new RuntimeException(e);
}
- }, 10000, "Failed to reflect query quota on rate limiter in 5s.");
+ }, 10000, "Failed to reflect query quota on rate limiter in 10s.");
} catch (AssertionError ae) {
throw new AssertionError(
ae.getMessage() + " Expected quota:" + quotaQps + " but is: " +
_quota + " set on: " + _quotaSource, ae);
@@ -375,13 +435,17 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
private void runQueriesOnBroker(float qps, boolean shouldFail) {
+ runQueriesOnBroker(qps, shouldFail, getTableName());
+ }
+
+ private void runQueriesOnBroker(float qps, boolean shouldFail, String
tableName) {
int failCount = 0;
long deadline = System.currentTimeMillis() + 1000;
for (int i = 0; i < qps; i++) {
sleep(deadline, qps - i);
BrokerResponse resultSetGroup =
- executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*)
FROM " + getTableName());
+ executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*)
FROM " + tableName);
for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements();
it.hasNext(); ) {
JsonNode exception = it.next();
if (exception.get("errorCode").asInt() ==
QueryErrorCode.TOO_MANY_REQUESTS.getId()) {
@@ -406,6 +470,14 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
// to allow change propagation to QueryQuotaManager
}
+ public void addQueryQuotaToLogicalTableConfig(Integer maxQps)
+ throws Exception {
+ LogicalTableConfig logicalTableConfig = getLogicalTableConfig();
+ logicalTableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ?
null : maxQps.toString()));
+ updateLogicalTableConfig(logicalTableConfig);
+ // to allow change propagation to QueryQuotaManager
+ }
+
public void addQueryQuotaToDatabaseConfig(Integer maxQps)
throws Exception {
String url = _controllerRequestURLBuilder.getBaseUrl() +
"/databases/default/quotas";
@@ -453,4 +525,16 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
// to allow change propagation to QueryQuotaManager
}
+
+ private static String getLogicalTableName() {
+ return "logical_table";
+ }
+
+ private LogicalTableConfig getLogicalTableConfig() {
+ List<String> physicalTableNames =
List.of(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()));
+ LogicalTableConfig logicalTableConfig =
+ getDummyLogicalTableConfig(getLogicalTableName(), physicalTableNames,
"DefaultTenant");
+ logicalTableConfig.setQuotaConfig(new QuotaConfig(null, null));
+ return logicalTableConfig;
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 29c6f05516..7fb4802cc1 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -324,16 +324,6 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
return LogicalTableConfig.fromString(resp);
}
- protected void updateLogicalTableConfig(String logicalTableName,
LogicalTableConfig logicalTableConfig)
- throws IOException {
- String updateLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
- String resp =
- ControllerTest.sendPutRequest(updateLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-
- assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" +
getLogicalTableName()
- + " logical table successfully updated.\"}");
- }
-
protected void deleteLogicalTable()
throws IOException {
String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName());
@@ -448,7 +438,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
QueryConfig queryConfig = new QueryConfig(null, false, null, null, null,
null);
LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(getLogicalTableName());
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
String groovyQuery = "SELECT
GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
+ "'arg0 + arg1', FlightNum, Origin) FROM mytable";
@@ -460,7 +450,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
queryConfig = new QueryConfig(null, true, null, null, null, null);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
// grpc and http throw different exceptions. So only check error message.
Exception athrows = expectThrows(Exception.class, () ->
postQuery(groovyQuery));
@@ -468,7 +458,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
// Remove query config
logicalTableConfig.setQueryConfig(null);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery));
assertTrue(athrows.getMessage().contains("Groovy transform functions are
disabled for queries"));
@@ -482,7 +472,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
QueryConfig queryConfig = new QueryConfig(null, null, null, null, 100L,
null);
LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(getLogicalTableName());
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
JsonNode response = postQuery(starQuery);
JsonNode exceptions = response.get("exceptions");
@@ -492,7 +482,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
// Query Succeeds with a high limit.
queryConfig = new QueryConfig(null, null, null, null, 1000000L, null);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
response = postQuery(starQuery);
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -500,7 +490,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
//Reset to null.
queryConfig = new QueryConfig(null, null, null, null, null, null);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
response = postQuery(starQuery);
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -514,7 +504,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
QueryConfig queryConfig = new QueryConfig(null, null, null, null, null,
1000L);
LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(getLogicalTableName());
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
JsonNode response = postQuery(starQuery);
JsonNode exceptions = response.get("exceptions");
assertTrue(!exceptions.isEmpty()
@@ -523,7 +513,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
// Query Succeeds with a high limit.
queryConfig = new QueryConfig(null, null, null, null, null, 1000000L);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
response = postQuery(starQuery);
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -531,7 +521,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
//Reset to null.
queryConfig = new QueryConfig(null, null, null, null, null, null);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
response = postQuery(starQuery);
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -544,7 +534,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null,
null);
LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(getLogicalTableName());
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
JsonNode response = postQuery(starQuery);
JsonNode exceptions = response.get("exceptions");
assertTrue(
@@ -555,7 +545,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
// Query Succeeds with a high limit.
queryConfig = new QueryConfig(1000000L, null, null, null, null, null);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
response = postQuery(starQuery);
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
@@ -563,7 +553,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
//Reset to null.
queryConfig = new QueryConfig(null, null, null, null, null, null);
logicalTableConfig.setQueryConfig(queryConfig);
- updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
response = postQuery(starQuery);
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
index ba594a303c..cafa33a56d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
@@ -76,6 +76,6 @@ public class
LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B
logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters);
logicalTableConfig.setQueryConfig(null);
- updateLogicalTableConfig(logicalTableConfig.getTableName(),
logicalTableConfig);
+ updateLogicalTableConfig(logicalTableConfig);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]